Commit ef4aea8c authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Honoured the individual Retrieve garbage collection for jobs...

Honoured the individual Retrieve garbage collection for jobs ToReportToRepackForSuccess and ToReportToRepackForFailure
parent bd42ed62
......@@ -115,6 +115,7 @@ set(ObjectStoreUnitTests
BackendTest.cpp
RootEntryTest.cpp
RetrieveQueueTest.cpp
RetrieveRequestTest.cpp
GarbageCollectorTest.cpp
AlgorithmsTest.cpp
SorterTest.cpp
......
......@@ -27,6 +27,8 @@
#include "Helpers.hpp"
#include "common/utils/utils.hpp"
#include "LifecycleTimingsSerDeser.hpp"
#include "Sorter.hpp"
#include "AgentWrapper.hpp"
#include <google/protobuf/util/json_util.h>
#include <cmath>
......@@ -84,21 +86,65 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe
using serializers::RetrieveJobStatus;
std::set<std::string> candidateVids;
for (auto &j: m_payload.jobs()) {
if (j.status() == RetrieveJobStatus::RJS_ToTransferForUser) {
// Find the job details in tape file
for (auto &tf: m_payload.archivefile().tapefiles()) {
if (tf.copynb() == j.copynb()) {
candidateVids.insert(tf.vid());
goto found;
switch(j.status()){
case RetrieveJobStatus::RJS_ToTransferForUser:
// Find the job details in tape file
for (auto &tf: m_payload.archivefile().tapefiles()) {
if (tf.copynb() == j.copynb()) {
candidateVids.insert(tf.vid());
goto found;
}
}
}
{
std::stringstream err;
err << "In RetrieveRequest::garbageCollect(): could not find tapefile for copynb " << j.copynb();
throw exception::Exception(err.str());
}
found:;
{
std::stringstream err;
err << "In RetrieveRequest::garbageCollect(): could not find tapefile for copynb " << j.copynb();
throw exception::Exception(err.str());
}
break;
case RetrieveJobStatus::RJS_ToReportToRepackForSuccess:
case RetrieveJobStatus::RJS_ToReportToRepackForFailure:
//We don't have any vid to find, we just need to
//Requeue it into RetrieveQueueToReportToRepackForSuccess or into the RetrieveQueueToReportToRepackForFailure (managed by the sorter)
for (auto &tf: m_payload.archivefile().tapefiles()) {
if (tf.copynb() == j.copynb()) {
Sorter sorter(agentReference,m_objectStore,catalogue);
std::shared_ptr<RetrieveRequest> rr = std::make_shared<RetrieveRequest>(*this);
cta::objectstore::Agent agentRR(getOwner(),m_objectStore);
cta::objectstore::AgentWrapper agentRRWrapper(agentRR);
sorter.insertRetrieveRequest(rr,agentRRWrapper,cta::optional<uint32_t>(tf.copynb()),lc);
std::string retrieveQueueAddress = rr->getRepackInfo().repackRequestAddress;
this->m_exclusiveLock->release();
cta::objectstore::Sorter::MapRetrieve allRetrieveJobs = sorter.getAllRetrieve();
std::list<std::tuple<cta::objectstore::Sorter::RetrieveJob,std::future<void>>> allFutures;
cta::utils::Timer t;
cta::log::TimingList tl;
for(auto& kv: allRetrieveJobs){
for(auto& job: kv.second){
allFutures.emplace_back(std::make_tuple(std::get<0>(job->jobToQueue),std::get<1>(job->jobToQueue).get_future()));
}
}
sorter.flushAll(lc);
tl.insertAndReset("sorterFlushingTime",t);
for(auto& future: allFutures){
//Throw an exception in case of failure
std::get<1>(future).get();
}
log::ScopedParamContainer params(lc);
params.add("jobObject", getAddressIfSet())
.add("fileId", m_payload.archivefile().archivefileid())
.add("queueObject", retrieveQueueAddress)
.add("copynb", tf.copynb())
.add("tapeVid", tf.vid());
tl.addToLog(params);
lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): requeued the repack retrieve request.");
return;
}
}
break;
default:
break;
}
found:;
}
std::string bestVid;
// If no tape file is a candidate, we just need to skip to queueing to the failed queue
......
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include "common/log/DummyLogger.hpp"
#include "BackendVFS.hpp"
#include "RootEntry.hpp"
#include "AgentReference.hpp"
#include "Agent.hpp"
#include "RetrieveRequest.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
#endif
#include "catalogue/DummyCatalogue.hpp"
#include "Algorithms.hpp"
#include "RetrieveQueueAlgorithms.hpp"
#include "RetrieveQueue.hpp"
namespace unitTests {
using namespace cta::objectstore;
TEST(RetrieveRequest, IndividualGarbageCollectionRetrieveRequestToReportToRepackForSuccess) {
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue;
// Here we check that can successfully call RetrieveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
// continue agent creation.
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
// Create an agent to garbage be collected
cta::objectstore::AgentReference agrA("unitTestAgentA", dl);
cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
agA.initialize();
agA.setTimeout_us(0);
agA.insertAndRegisterSelf(lc);
//Create the RetrieveRequest
std::string atfrAddr = agrA.nextId("RetrieveRequest");
agrA.addToOwnership(atfrAddr, be);
cta::objectstore::RetrieveRequest rr(atfrAddr, be);
rr.initialize();
cta::common::dataStructures::RetrieveFileQueueCriteria rqc;
rqc.archiveFile.archiveFileID = 123456789L;
rqc.archiveFile.diskFileId = "eos://diskFile";
rqc.archiveFile.checksumBlob.insert(cta::checksum::NONE, "");
rqc.archiveFile.creationTime = 0;
rqc.archiveFile.reconciliationTime = 0;
rqc.archiveFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo();
rqc.archiveFile.diskInstance = "eoseos";
rqc.archiveFile.fileSize = 1000;
rqc.archiveFile.storageClass = "sc";
{
cta::common::dataStructures::TapeFile tf;
tf.blockId=0;
tf.fileSize=1;
tf.copyNb=1;
tf.creationTime=time(nullptr);
tf.fSeq=1;
tf.vid="Tape0";
rqc.archiveFile.tapeFiles.push_back(tf);
}
rqc.mountPolicy.archiveMinRequestAge = 1;
rqc.mountPolicy.archivePriority = 1;
rqc.mountPolicy.creationLog.time = time(nullptr);
rqc.mountPolicy.lastModificationLog.time = time(nullptr);
rqc.mountPolicy.maxDrivesAllowed = 1;
rqc.mountPolicy.retrieveMinRequestAge = 1;
rqc.mountPolicy.retrievePriority = 1;
rr.setRetrieveFileQueueCriteria(rqc);
cta::common::dataStructures::RetrieveRequest sReq;
sReq.archiveFileID = rqc.archiveFile.archiveFileID;
sReq.creationLog.time=time(nullptr);
rr.setSchedulerRequest(sReq);
rr.addJob(1, 1, 1, 1);
rr.setJobStatus(1,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess);
rr.setOwner(agA.getAddressIfSet());
rr.setActiveCopyNumber(0);
cta::objectstore::RetrieveRequest::RepackInfo ri;
ri.isRepack = true;
ri.fSeq = 1;
ri.fileBufferURL = "testFileBufferURL";
ri.repackRequestAddress = "repackRequestAddress";
rr.setRepackInfo(ri);
rr.insert();
//Calling the individual garbage collection of the RetrieveRequest
cta::objectstore::ScopedExclusiveLock sel(rr);
ASSERT_NO_THROW(rr.garbageCollect(rr.getOwner(),agentRef,lc,catalogue));
//The Retrieve Request should now be queued in the RetrieveQueueToReportToRepackForSuccess
typedef cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueueToReportToRepackForSuccess> RQTRTRFSAlgo;
RQTRTRFSAlgo algo(be,agentRef);
RQTRTRFSAlgo::PopCriteria criteria;
criteria.files = 1;
auto jobs = algo.popNextBatch(ri.repackRequestAddress, criteria, lc);
ASSERT_FALSE(jobs.elements.empty());
ASSERT_EQ(123456789L,jobs.elements.front().archiveFile.archiveFileID);
ASSERT_EQ(1,jobs.elements.front().archiveFile.tapeFiles.at(1).fSeq);
ASSERT_EQ("Tape0",jobs.elements.front().archiveFile.tapeFiles.at(1).vid);
}
TEST(RetrieveRequest, IndividualGarbageCollectionRetrieveRequestToReportToRepackForFailure) {
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue;
// Here we check that can successfully call RetrieveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
// continue agent creation.
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
// Create an agent to garbage be collected
cta::objectstore::AgentReference agrA("unitTestAgentA", dl);
cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
agA.initialize();
agA.setTimeout_us(0);
agA.insertAndRegisterSelf(lc);
//Create the RetrieveRequest
std::string atfrAddr = agrA.nextId("RetrieveRequest");
agrA.addToOwnership(atfrAddr, be);
cta::objectstore::RetrieveRequest rr(atfrAddr, be);
rr.initialize();
cta::common::dataStructures::RetrieveFileQueueCriteria rqc;
rqc.archiveFile.archiveFileID = 123456789L;
rqc.archiveFile.diskFileId = "eos://diskFile";
rqc.archiveFile.checksumBlob.insert(cta::checksum::NONE, "");
rqc.archiveFile.creationTime = 0;
rqc.archiveFile.reconciliationTime = 0;
rqc.archiveFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo();
rqc.archiveFile.diskInstance = "eoseos";
rqc.archiveFile.fileSize = 1000;
rqc.archiveFile.storageClass = "sc";
{
cta::common::dataStructures::TapeFile tf;
tf.blockId=0;
tf.fileSize=1;
tf.copyNb=1;
tf.creationTime=time(nullptr);
tf.fSeq=1;
tf.vid="Tape0";
rqc.archiveFile.tapeFiles.push_back(tf);
}
rqc.mountPolicy.archiveMinRequestAge = 1;
rqc.mountPolicy.archivePriority = 1;
rqc.mountPolicy.creationLog.time = time(nullptr);
rqc.mountPolicy.lastModificationLog.time = time(nullptr);
rqc.mountPolicy.maxDrivesAllowed = 1;
rqc.mountPolicy.retrieveMinRequestAge = 1;
rqc.mountPolicy.retrievePriority = 1;
rr.setRetrieveFileQueueCriteria(rqc);
cta::common::dataStructures::RetrieveRequest sReq;
sReq.archiveFileID = rqc.archiveFile.archiveFileID;
sReq.creationLog.time=time(nullptr);
rr.setSchedulerRequest(sReq);
rr.addJob(1, 1, 1, 1);
rr.setJobStatus(1,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForFailure);
rr.setOwner(agA.getAddressIfSet());
rr.setActiveCopyNumber(0);
cta::objectstore::RetrieveRequest::RepackInfo ri;
ri.isRepack = true;
ri.fSeq = 1;
ri.fileBufferURL = "testFileBufferURL";
ri.repackRequestAddress = "repackRequestAddress";
rr.setRepackInfo(ri);
rr.insert();
//Calling the individual garbage collection of the RetrieveRequest
cta::objectstore::ScopedExclusiveLock sel(rr);
ASSERT_NO_THROW(rr.garbageCollect(rr.getOwner(),agentRef,lc,catalogue));
//The Retrieve Request should now be queued in the RetrieveQueueToReportToRepackForFailure
typedef cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueueToReportToRepackForFailure> RQTRTRFFAlgo;
RQTRTRFFAlgo algo(be,agentRef);
RQTRTRFFAlgo::PopCriteria criteria;
criteria.files = 1;
auto jobs = algo.popNextBatch(ri.repackRequestAddress, criteria, lc);
ASSERT_FALSE(jobs.elements.empty());
ASSERT_EQ(123456789L,jobs.elements.front().archiveFile.archiveFileID);
ASSERT_EQ(1,jobs.elements.front().archiveFile.tapeFiles.at(1).fSeq);
ASSERT_EQ("Tape0",jobs.elements.front().archiveFile.tapeFiles.at(1).vid);
}
}
\ No newline at end of file
......@@ -3623,7 +3623,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,
serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription});
requestToJobMap[&req->m_retrieveRequest] = req;
}
}
RQTRTRFSAlgo rQTRTRFSAlgo(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
try {
rQTRTRFSAlgo.referenceAndSwitchOwnership(repackRequestQueue.first, insertedRequests, lc);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment