Commit 26d7ed53 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Insertion of not fetched Archive and Retrieve requests into the Sorter

parent 7781d3ef
...@@ -132,6 +132,7 @@ set (COMMON_LIB_SRC_FILES ...@@ -132,6 +132,7 @@ set (COMMON_LIB_SRC_FILES
threading/Thread.cpp threading/Thread.cpp
threading/Semaphores.cpp threading/Semaphores.cpp
threading/SubProcess.cpp threading/SubProcess.cpp
threading/Async.cpp
utils/GetOptThreadSafe.cpp utils/GetOptThreadSafe.cpp
utils/Regex.cpp utils/Regex.cpp
utils/utils.cpp utils/utils.cpp
...@@ -183,6 +184,7 @@ set (COMMON_UNIT_TESTS_LIB_SRC_FILES ...@@ -183,6 +184,7 @@ set (COMMON_UNIT_TESTS_LIB_SRC_FILES
# threading/ThreadingMPTests.cpp is commented out because of errors caused by libust # threading/ThreadingMPTests.cpp is commented out because of errors caused by libust
threading/ThreadingMTTests.cpp threading/ThreadingMTTests.cpp
threading/ThreadingTests.cpp threading/ThreadingTests.cpp
threading/AsyncTests.cpp
threading/AtomicCounterTest.cpp threading/AtomicCounterTest.cpp
utils/GetOptThreadSafeTest.cpp utils/GetOptThreadSafeTest.cpp
utils/RegexTest.cpp utils/RegexTest.cpp
......
#include <future>
#include "Async.hpp"
#include "Thread.hpp"
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
namespace cta {
Async::ThreadWrapper::ThreadWrapper(std::function<void()> callable):m_callable(callable){}
void Async::ThreadWrapper::run(){
try{
m_callable();
m_promise.set_value();
} catch (...){
m_promise.set_exception(std::current_exception());
}
}
std::future<void> Async::async(std::function<void()> callable){
std::future<void> ret;
Async::ThreadWrapper threadCallable(callable);
ret = threadCallable.m_promise.get_future();
threadCallable.run();
return ret;
}
}
\ No newline at end of file
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef ASYNC_HPP
#define ASYNC_HPP
#include "Thread.hpp"
namespace cta {
/**
* This class holds the necessary to reimplement the std::async function.
* Indeed, helgrind does not like the way how std::async execute the std::function passed in parameter
* (usage of a shared_ptr : https://stackoverflow.com/questions/8393777/current-state-of-drd-and-helgrind-support-for-stdthread/8458482#8458482)
*
* How to use this class :
* auto future = cta::Async::async([]{
* //DO SOME ACTIONS TO EXECUTE ASYNCHRONOUSLY
* });
*/
class Async{
public:
/**
* This method allows to execute asynchronously the callable passed in parameter
* @param callable the callable to execute asynchronously
* @return the future associated to the execution of the callable. If an exception is thrown during the execution
* of the callable, the future.get() will throw this exception
*/
static std::future<void> async(std::function<void()> callable);
private:
class ThreadWrapper: public threading::Thread{
friend Async;
public:
ThreadWrapper(std::function<void()> callable);
void run();
private:
std::function<void()> m_callable;
std::promise<void> m_promise;
};
};
}
#endif /* ASYNC_HPP */
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include "Async.cpp"
namespace unitTests {
using namespace cta;
TEST(cta_async, AsyncCallableTakesTime) {
auto future = cta::Async::async([]{
::sleep(2);
});
ASSERT_NO_THROW(future.get());
}
TEST(cta_async, AsyncCallableDoNothing) {
auto future = cta::Async::async([]{
});
ASSERT_NO_THROW(future.get());
}
TEST(cta_async,AsyncCallableThrowException){
auto future = cta::Async::async([]{
throw cta::exception::Exception("Chuck Norris writes code that optimizes itself.");
});
ASSERT_THROW(future.get(),cta::exception::Exception);
}
}
\ No newline at end of file
...@@ -21,12 +21,14 @@ ...@@ -21,12 +21,14 @@
#include <iostream> #include <iostream>
namespace cta { namespace objectstore { namespace cta { namespace objectstore {
Sorter::Sorter(AgentReference& agentReference, Backend& objectstore, catalogue::Catalogue& catalogue):m_agentReference(agentReference),m_objectstore(objectstore),m_catalogue(catalogue){}
Sorter::~Sorter() { /* SORTER CLASS */
}
Sorter::Sorter(AgentReference& agentReference, Backend& objectstore, catalogue::Catalogue& catalogue):m_agentReference(agentReference),m_objectstore(objectstore),m_catalogue(catalogue){
}
Sorter::~Sorter(){
}
/* Archive related algorithms */ /* Archive related algorithms */
template <typename SpecificQueue> template <typename SpecificQueue>
...@@ -48,7 +50,7 @@ void Sorter::executeArchiveAlgorithm(const std::string tapePool, std::string& qu ...@@ -48,7 +50,7 @@ void Sorter::executeArchiveAlgorithm(const std::string tapePool, std::string& qu
for(auto &failedAR: failure.failedElements){ for(auto &failedAR: failure.failedElements){
try{ try{
std::rethrow_exception(failedAR.failure); std::rethrow_exception(failedAR.failure);
} catch(cta::exception::Exception &e){ } catch(const cta::exception::Exception &e){
uint32_t copyNb = failedAR.element->copyNb; uint32_t copyNb = failedAR.element->copyNb;
std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception()); std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception());
succeededJobs.erase(copyNb); succeededJobs.erase(copyNb);
...@@ -87,31 +89,39 @@ void Sorter::dispatchArchiveAlgorithm(const std::string tapePool, const JobQueue ...@@ -87,31 +89,39 @@ void Sorter::dispatchArchiveAlgorithm(const std::string tapePool, const JobQueue
void Sorter::insertArchiveRequest(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface& previousOwner, log::LogContext& lc){ void Sorter::insertArchiveRequest(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface& previousOwner, log::LogContext& lc){
for(auto& job: archiveRequest->dumpJobs()){ for(auto& job: archiveRequest->dumpJobs()){
insertArchiveJob(archiveRequest,previousOwner,job,lc); ArchiveJob jobToInsert;
jobToInsert.archiveRequest = archiveRequest;
jobToInsert.archiveFile = archiveRequest->getArchiveFile();
jobToInsert.jobDump = job;
jobToInsert.mountPolicy = archiveRequest->getMountPolicy();
jobToInsert.previousOwner = &previousOwner;
try{
jobToInsert.jobQueueType = archiveRequest->getJobQueueType(job.copyNb);
insertArchiveJob(jobToInsert);
} catch(const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("fileId", archiveRequest->getArchiveFile().archiveFileID)
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::ERR,"In Sorter::insertArchiveJob() Failed to determine destination queue for Archive Job.");
}
} }
} }
void Sorter::insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface &previousOwner, ArchiveRequest::JobDump& jobToInsert, log::LogContext & lc){ void Sorter::insertArchiveRequest(const SorterArchiveRequest& archiveRequest, AgentReferenceInterface& previousOwner, log::LogContext& lc) {
auto ajqi = std::make_shared<ArchiveJobQueueInfo>(); for(auto& archiveJob: archiveRequest.archiveJobs){
Sorter::ArchiveJob jobToAdd; ArchiveJob jobToInsert = archiveJob;
jobToAdd.archiveRequest = archiveRequest; jobToInsert.previousOwner = &previousOwner;
jobToAdd.archiveFile = archiveRequest->getArchiveFile(); insertArchiveJob(jobToInsert);
jobToAdd.jobDump.copyNb = jobToInsert.copyNb;
jobToAdd.mountPolicy = archiveRequest->getMountPolicy();
jobToAdd.previousOwner = &previousOwner;
jobToAdd.jobDump.tapePool = jobToInsert.tapePool;
ajqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
try{
threading::MutexLocker mapLocker(m_mutex);
m_archiveQueuesAndRequests[std::make_tuple(jobToInsert.tapePool, archiveRequest->getJobQueueType(jobToInsert.copyNb))].emplace_back(ajqi);
} catch (cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("fileId", archiveRequest->getArchiveFile().archiveFileID)
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::ERR,"In Sorter::insertArchiveJob() Failed to determine destination queue for Archive Job.");
} }
} }
void Sorter::insertArchiveJob(const ArchiveJob& job){
auto ajqi = std::make_shared<ArchiveJobQueueInfo>();
ajqi->jobToQueue = std::make_tuple(job,std::promise<void>());
threading::MutexLocker mapLocker(m_mutex);
m_archiveQueuesAndRequests[std::make_tuple(job.jobDump.tapePool, job.jobQueueType)].emplace_back(ajqi);
}
bool Sorter::flushOneArchive(log::LogContext &lc) { bool Sorter::flushOneArchive(log::LogContext &lc) {
threading::MutexLocker locker(m_mutex); threading::MutexLocker locker(m_mutex);
for(auto & kv: m_archiveQueuesAndRequests){ for(auto & kv: m_archiveQueuesAndRequests){
...@@ -157,7 +167,7 @@ void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueA ...@@ -157,7 +167,7 @@ void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueA
for(auto& failedRR: failure.failedElements){ for(auto& failedRR: failure.failedElements){
try{ try{
std::rethrow_exception(failedRR.failure); std::rethrow_exception(failedRR.failure);
} catch (cta::exception::Exception){ } catch (const cta::exception::Exception&){
uint32_t copyNb = failedRR.element->copyNb; uint32_t copyNb = failedRR.element->copyNb;
std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception()); std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception());
succeededJobs.erase(copyNb); succeededJobs.erase(copyNb);
...@@ -205,26 +215,36 @@ Sorter::RetrieveJob Sorter::createRetrieveJob(std::shared_ptr<RetrieveRequest> r ...@@ -205,26 +215,36 @@ Sorter::RetrieveJob Sorter::createRetrieveJob(std::shared_ptr<RetrieveRequest> r
jobToAdd.previousOwner = previousOwner; jobToAdd.previousOwner = previousOwner;
jobToAdd.jobDump.status = retrieveRequest->getJobStatus(jobToAdd.jobDump.copyNb); jobToAdd.jobDump.status = retrieveRequest->getJobStatus(jobToAdd.jobDump.copyNb);
jobToAdd.fileSize = archiveFile.fileSize; jobToAdd.fileSize = archiveFile.fileSize;
jobToAdd.jobQueueType = retrieveRequest->getQueueType(copyNb); //May throw an exception
return jobToAdd; return jobToAdd;
} }
void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc){ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc){
OStoreRetrieveRequestAccessor requestAccessor(retrieveRequest);
insertRetrieveRequest(requestAccessor, previousOwner, copyNb, lc);
}
void Sorter::insertRetrieveRequest(SorterRetrieveRequest& retrieveRequest, AgentReferenceInterface &previousOwner,cta::optional<uint32_t> copyNb, log::LogContext& lc){
SorterRetrieveRequestAccessor accessor(retrieveRequest);
this->insertRetrieveRequest(accessor,previousOwner,copyNb,lc);
}
void Sorter::insertRetrieveRequest(RetrieveRequestInfosAccessorInterface& accessor, AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc){
if(copyNb == cta::nullopt){ if(copyNb == cta::nullopt){
//The job to queue will be a ToTransfer //The job to queue will be a ToTransfer
std::set<std::string> candidateVidsToTransfer = getCandidateVidsToTransfer(*retrieveRequest); std::set<std::string> candidateVidsToTransfer = getCandidateVidsToTransfer(accessor);
if(!candidateVidsToTransfer.empty()){ if(!candidateVidsToTransfer.empty()){
std::string bestVid = getBestVidForQueueingRetrieveRequest(*retrieveRequest, candidateVidsToTransfer ,lc); std::string bestVid = getBestVidForQueueingRetrieveRequest(accessor, candidateVidsToTransfer ,lc);
for (auto & tf: retrieveRequest->getArchiveFile().tapeFiles) { for (auto & tf: accessor.getArchiveFile().tapeFiles) {
if (tf.second.vid == bestVid) { if (tf.second.vid == bestVid) {
goto vidFound; goto vidFound;
} }
} }
{ {
std::stringstream err; std::stringstream err;
err << "In Sorter::insertRetrieveRequest(): no tape file for requested vid. archiveId=" << retrieveRequest->getArchiveFile().archiveFileID err << "In Sorter::insertRetrieveRequest(): no tape file for requested vid. archiveId=" << accessor.getArchiveFile().archiveFileID
<< " vid=" << bestVid; << " vid=" << bestVid;
throw RetrieveRequestHasNoCopies(err.str()); throw RetrieveRequestHasNoCopies(err.str());
} }
...@@ -233,19 +253,27 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ ...@@ -233,19 +253,27 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ
log::ScopedParamContainer params(lc); log::ScopedParamContainer params(lc);
size_t copyNb = std::numeric_limits<size_t>::max(); size_t copyNb = std::numeric_limits<size_t>::max();
uint64_t fSeq = std::numeric_limits<uint64_t>::max(); uint64_t fSeq = std::numeric_limits<uint64_t>::max();
for (auto & tc: retrieveRequest->getArchiveFile().tapeFiles) { if (tc.second.vid==bestVid) { copyNb=tc.first; fSeq=tc.second.fSeq; } } for (auto & tc: accessor.getArchiveFile().tapeFiles) { if (tc.second.vid==bestVid) { copyNb=tc.first; fSeq=tc.second.fSeq; } }
cta::common::dataStructures::ArchiveFile archiveFile = retrieveRequest->getArchiveFile(); cta::common::dataStructures::ArchiveFile archiveFile = accessor.getArchiveFile();
Sorter::RetrieveJob jobToAdd = createRetrieveJob(retrieveRequest,archiveFile,copyNb,fSeq,&previousOwner); try{
//We are sure that we want to queue a ToTransfer Job Sorter::RetrieveJob jobToAdd = accessor.createRetrieveJob(archiveFile,copyNb,fSeq,&previousOwner);
rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); //We are sure that we want to queue a ToTransfer Job
threading::MutexLocker mapLocker(m_mutex); rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
m_retrieveQueuesAndRequests[std::make_tuple(bestVid,JobQueueType::JobsToTransferForUser)].emplace_back(rjqi); threading::MutexLocker mapLocker(m_mutex);
params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) m_retrieveQueuesAndRequests[std::make_tuple(bestVid,JobQueueType::JobsToTransferForUser)].emplace_back(rjqi);
.add("copyNb", copyNb) params.add("fileId", accessor.getArchiveFile().archiveFileID)
.add("tapeVid", bestVid) .add("copyNb", copyNb)
.add("fSeq", fSeq); .add("tapeVid", bestVid)
lc.log(log::INFO, "Selected vid to be queued for retrieve request."); .add("fSeq", fSeq);
return; lc.log(log::INFO, "Selected vid to be queued for retrieve request.");
return;
} catch (const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("fileId", accessor.getArchiveFile().archiveFileID)
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::ERR, "In Sorter::insertRetrieveRequest() Failed to determine destination queue for retrieve request.");
throw ex;
}
} else { } else {
throw cta::exception::Exception("In Sorter::insertRetrieveRequest(), there is no ToTransfer jobs in the RetrieveRequest. Please provide the copyNb of the job you want to queue."); throw cta::exception::Exception("In Sorter::insertRetrieveRequest(), there is no ToTransfer jobs in the RetrieveRequest. Please provide the copyNb of the job you want to queue.");
} }
...@@ -253,21 +281,21 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ ...@@ -253,21 +281,21 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ
//We want to queue a specific job identified by its copyNb //We want to queue a specific job identified by its copyNb
log::ScopedParamContainer params(lc); log::ScopedParamContainer params(lc);
auto rjqi = std::make_shared<RetrieveJobQueueInfo>(); auto rjqi = std::make_shared<RetrieveJobQueueInfo>();
cta::common::dataStructures::ArchiveFile archiveFile = retrieveRequest->getArchiveFile(); cta::common::dataStructures::ArchiveFile archiveFile = accessor.getArchiveFile();
cta::common::dataStructures::TapeFile jobTapeFile = archiveFile.tapeFiles[copyNb.value()]; cta::common::dataStructures::TapeFile jobTapeFile = archiveFile.tapeFiles[copyNb.value()];
Sorter::RetrieveJob jobToAdd = createRetrieveJob(retrieveRequest,archiveFile,jobTapeFile.copyNb,jobTapeFile.fSeq,&previousOwner);
rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
try{ try{
Sorter::RetrieveJob jobToAdd = accessor.createRetrieveJob(archiveFile,jobTapeFile.copyNb,jobTapeFile.fSeq,&previousOwner);
rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
threading::MutexLocker mapLocker(m_mutex); threading::MutexLocker mapLocker(m_mutex);
m_retrieveQueuesAndRequests[std::make_tuple(jobTapeFile.vid, retrieveRequest->getQueueType(copyNb.value()))].emplace_back(rjqi); m_retrieveQueuesAndRequests[std::make_tuple(jobTapeFile.vid, jobToAdd.jobQueueType)].emplace_back(rjqi);
params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) params.add("fileId", accessor.getArchiveFile().archiveFileID)
.add("copyNb", copyNb.value()) .add("copyNb", copyNb.value())
.add("tapeVid", jobTapeFile.vid) .add("tapeVid", jobTapeFile.vid)
.add("fSeq", jobTapeFile.fSeq); .add("fSeq", jobTapeFile.fSeq);
lc.log(log::INFO, "Selected the vid of the job to be queued for retrieve request."); lc.log(log::INFO, "Selected the vid of the job to be queued for retrieve request.");
} catch (cta::exception::Exception &ex){ } catch (const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc); log::ScopedParamContainer params(lc);
params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) params.add("fileId", accessor.getArchiveFile().archiveFileID)
.add("exceptionMessage", ex.getMessageValue()); .add("exceptionMessage", ex.getMessageValue());
lc.log(log::ERR, "In Sorter::insertRetrieveRequest() Failed to determine destination queue for retrieve request."); lc.log(log::ERR, "In Sorter::insertRetrieveRequest() Failed to determine destination queue for retrieve request.");
throw ex; throw ex;
...@@ -275,7 +303,7 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ ...@@ -275,7 +303,7 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ
} }
} }
std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequest &request){ std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequest& request){
using serializers::RetrieveJobStatus; using serializers::RetrieveJobStatus;
std::set<std::string> candidateVids; std::set<std::string> candidateVids;
for (auto & j: request.dumpJobs()) { for (auto & j: request.dumpJobs()) {
...@@ -286,6 +314,28 @@ std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequest &reques ...@@ -286,6 +314,28 @@ std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequest &reques
return candidateVids; return candidateVids;
} }
std::set<std::string> Sorter::getCandidateVidsToTransfer(const SorterRetrieveRequest& request){
using serializers::RetrieveJobStatus;
std::set<std::string> candidateVids;
for (auto & j: request.retrieveJobs) {
if(j.second.jobDump.status == RetrieveJobStatus::RJS_ToTransferForUser) {
candidateVids.insert(request.archiveFile.tapeFiles.at(j.second.jobDump.copyNb).vid);
}
}
return candidateVids;
}
std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequestInfosAccessorInterface &requestAccessor){
using serializers::RetrieveJobStatus;
std::set<std::string> candidateVids;
for(auto& j: requestAccessor.getJobs()){
if(j.status == RetrieveJobStatus::RJS_ToTransferForUser){
candidateVids.insert(requestAccessor.getArchiveFile().tapeFiles.at(j.copyNb).vid);
}
}
return candidateVids;
}
std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc){ std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc){
std::string vid; std::string vid;
try{ try{
...@@ -299,6 +349,32 @@ std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrie ...@@ -299,6 +349,32 @@ std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrie
return vid; return vid;
} }
std::string Sorter::getBestVidForQueueingRetrieveRequest(const SorterRetrieveRequest& request, std::set<std::string>& candidateVids, log::LogContext &lc){
std::string vid;
try{
vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore);
} catch (Helpers::NoTapeAvailableForRetrieve & ex) {
log::ScopedParamContainer params(lc);
params.add("fileId", request.archiveFile.archiveFileID);
lc.log(log::INFO, "In Sorter::getVidForQueueingRetrieveRequest(): No available tape found.");
throw ex;
}
return vid;
}
std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequestInfosAccessorInterface &requestAccessor, std::set<std::string>& candidateVids, log::LogContext &lc){
std::string vid;
try{
vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore);
} catch (Helpers::NoTapeAvailableForRetrieve & ex) {
log::ScopedParamContainer params(lc);
params.add("fileId", requestAccessor.getArchiveFile().archiveFileID);
lc.log(log::INFO, "In Sorter::getVidForQueueingRetrieveRequest(): No available tape found.");
throw ex;
}
return vid;
}
bool Sorter::flushOneRetrieve(log::LogContext &lc){ bool Sorter::flushOneRetrieve(log::LogContext &lc){
threading::MutexLocker locker(m_mutex); threading::MutexLocker locker(m_mutex);
for(auto & kv: m_retrieveQueuesAndRequests){ for(auto & kv: m_retrieveQueuesAndRequests){
...@@ -328,4 +404,72 @@ void Sorter::flushAll(log::LogContext& lc){ ...@@ -328,4 +404,72 @@ void Sorter::flushAll(log::LogContext& lc){
while(flushOneArchive(lc)){} while(flushOneArchive(lc)){}
} }
/* END OF SORTER CLASS */
/* RetrieveRequestInfosAccessor CLASS */
RetrieveRequestInfosAccessorInterface::RetrieveRequestInfosAccessorInterface(){}
RetrieveRequestInfosAccessorInterface::~RetrieveRequestInfosAccessorInterface(){}
/* END OF RetrieveRequestInfosAccessor CLASS */
/* RetrieveRequestAccessor CLASS */
OStoreRetrieveRequestAccessor::OStoreRetrieveRequestAccessor(std::shared_ptr<RetrieveRequest> retrieveRequest):m_retrieveRequest(retrieveRequest){}
OStoreRetrieveRequestAccessor::~OStoreRetrieveRequestAccessor(){}
std::list<RetrieveRequest::JobDump> OStoreRetrieveRequestAccessor::getJobs(){
return m_retrieveRequest->dumpJobs();
}
common::dataStructures::ArchiveFile OStoreRetrieveRequestAccessor::getArchiveFile(){
return m_retrieveRequest->getArchiveFile();
}
Sorter::RetrieveJob OStoreRetrieveRequestAccessor::createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile,
const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner){
Sorter::RetrieveJob ret;
ret.jobDump.copyNb = copyNb;
ret.fSeq = fSeq;
ret.mountPolicy = m_retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy;
ret.retrieveRequest = m_retrieveRequest;
ret.previousOwner = previousOwner;
ret.jobDump.status = m_retrieveRequest->getJobStatus(ret.jobDump.copyNb);
ret.jobQueueType = m_retrieveRequest->getQueueType(copyNb);
ret.fileSize = archiveFile.fileSize;
return ret;
}
/* END OF RetrieveRequestAccessor CLASS */
/* SorterRetrieveRequestAccessor CLASS */
SorterRetrieveRequestAccessor::SorterRetrieveRequestAccessor(Sorter::SorterRetrieveRequest& request):m_retrieveRequest(request){}
SorterRetrieveRequestAccessor::~SorterRetrieveRequestAccessor(){}
std::list<RetrieveRequest::JobDump> SorterRetrieveRequestAccessor::getJobs(){
std::list<RetrieveRequest::JobDump> ret;
for(auto& kv: m_retrieveRequest.retrieveJobs){
ret.push_back(kv.second.jobDump);
}
return ret;
}
common::dataStructures::ArchiveFile SorterRetrieveRequestAccessor::getArchiveFile(){
return m_retrieveRequest.archiveFile;
}
Sorter::RetrieveJob SorterRetrieveRequestAccessor::createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile,