Commit db7d6879 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Added --disabledtape flag to cta-admin repack add, honoured this new flag

parent eecd104d
......@@ -32,6 +32,7 @@ namespace catalogue {
class DummyCatalogue: public Catalogue {
public:
DummyCatalogue() {}
DummyCatalogue(const cta::catalogue::DummyCatalogue*) {}
virtual ~DummyCatalogue() { }
void createActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& acttivity, double weight, const std::string & comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
......
......@@ -254,7 +254,8 @@ const std::map<std::string, OptionBoolean::Key> boolOptions = {
{ "--justaddcopies", OptionBoolean::JUSTADDCOPIES },
{ "--justretrieve", OptionBoolean::JUSTRETRIEVE },
{ "--log", OptionBoolean::SHOW_LOG_ENTRIES },
{ "--summary", OptionBoolean::SUMMARY }
{ "--summary", OptionBoolean::SUMMARY },
{ "--disabledtape", OptionBoolean::DISABLED }
};
......@@ -350,7 +351,8 @@ const std::map<AdminCmd::Cmd, CmdHelp> cmdHelp = {
" If the --justaddcopies option is set, new (or missing) copies (as defined by the storage class) of the files located on the tape to repack will be created and migrated.\n"
" By default, CTA will migrate AND add new (or missing) copies (as defined by the storage class) of the files located on the tape to repack.\n"
" * The --mountpolicy option allows to give a specific mount policy that will be applied to the repack subrequests (retrieve and archive requests).\n"
" By default, a hardcoded mount policy is applied (every request priorities and minimum request ages = 1)."
" By default, a hardcoded mount policy is applied (every request priorities and minimum request ages = 1).\n"
" * If the --disabledtape flag is set, the tape to repack will be mounted for retrieval even if it is disabled."
"\n\n"
}},
{ AdminCmd::CMD_REQUESTERMOUNTRULE, { "requestermountrule", "rmr", { "add", "ch", "rm", "ls" } }},
......@@ -432,6 +434,7 @@ const Option opt_vo { Option::OPT_STR, "--vo",
const Option opt_vidfile { Option::OPT_STR_LIST, "--vidfile", "-f", " <filename>" };
const Option opt_full { Option::OPT_BOOL, "--full", "-f", " <\"true\" or \"false\">" };
const Option opt_readonly { Option::OPT_BOOL, "--readonly", "-r", " <\"true\" or \"false\">" };
const Option opt_disabled_tape { Option::OPT_FLAG, "--disabledtape", "-d", ""};
......@@ -500,7 +503,7 @@ const std::map<cmd_key_t, cmd_val_t> cmdOptions = {
{{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS }, { }},
/*----------------------------------------------------------------------------------------------------*/
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD },
{ opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional(), opt_mountpolicy.optional() }},
{ opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional(), opt_mountpolicy.optional(), opt_disabled_tape.optional() }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM }, { opt_vid }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS }, { opt_vid.optional() }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR }, { opt_vid }},
......
......@@ -61,6 +61,7 @@ struct RepackInfo {
uint64_t retrievedFiles;
uint64_t archivedFiles;
bool isExpandFinished;
bool forceDisabledTape;
// std::string tag;
// uint64_t totalFiles;
// uint64_t totalSize;
......
......@@ -82,6 +82,21 @@ fi
echo "Reclaiming tape ${VID_TO_REPACK}"
kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape reclaim --vid ${VID_TO_REPACK}
echo "Launching a Repack Request on a disabled tape"
VID_TO_REPACK=$(getFirstVidContainingFiles)
if [ "$VID_TO_REPACK" != "null" ]
then
echo "Marking the tape ${VID_TO_REPACK} as disabled"
kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape ch --disabled true --vid ${VID_TO_REPACK}
echo "Wating 15 seconds so that the RetrieveQueueStatisticsCache is flushed"
sleep 15
echo "Launching the repack request test on VID ${VID_TO_REPACK}
kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} || echo "OK" && exit 1
else
echo "No vid found to repack"
exit 1
fi;
NB_FILES=1153
kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -A || exit 1
......
......@@ -336,7 +336,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
obj.reset();
// Get the list of vids for non failed tape files.
std::set<std::string> candidateVids;
bool isRepack = rr->getRepackInfo().isRepack;
bool disabledTape = rr->getRepackInfo().forceDisabledTape;
for (auto & j: rr->dumpJobs()) {
if(j.status==RetrieveJobStatus::RJS_ToTransferForUser) {
for (auto &tf: rr->getArchiveFile().tapeFiles) {
......@@ -370,7 +370,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
// Back to the transfer case.
std::string vid;
try {
vid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, objectStore, isRepack);
vid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, objectStore, disabledTape);
} catch (Helpers::NoTapeAvailableForRetrieve & ex) {
log::ScopedParamContainer params3(lc);
params3.add("fileId", rr->getArchiveFile().archiveFileID);
......
......@@ -1638,6 +1638,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequestRepackDisabledTape){
cta::objectstore::RetrieveRequest::RepackInfo ri;
ri.isRepack = true;
ri.forceDisabledTape = true;
ri.fSeq = 1;
ri.fileBufferURL = "testFileBufferURL";
ri.repackRequestAddress = "repackRequestAddress";
......
......@@ -364,7 +364,7 @@ void Helpers::getLockedAndFetchedRepackQueue(RepackQueue& queue, ScopedExclusive
// Helpers::selectBestRetrieveQueue()
//------------------------------------------------------------------------------
std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candidateVids, cta::catalogue::Catalogue & catalogue,
objectstore::Backend & objectstore, bool isRepack) {
objectstore::Backend & objectstore, bool forceDisabledTape) {
// We will build the retrieve stats of the non-disable candidate vids here
std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats;
// A promise we create so we can make users wait on it.
......@@ -383,7 +383,7 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
grqsmLock.unlock();
updateFuture.wait();
grqsmLock.lock();
if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && isRepack)) {
if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && forceDisabledTape)) {
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
} else {
......@@ -391,7 +391,7 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
if (g_retrieveQueueStatistics.at(v).updateTime + c_retrieveQueueCacheMaxAge > time(nullptr))
throw std::out_of_range("");
// We're lucky: cache hit (and not stale)
if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && isRepack))
if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && forceDisabledTape))
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
} catch (std::out_of_range &) {
......@@ -430,7 +430,7 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
// Signal to potential waiters
updatePromise.set_value();
// Update our own candidate list if needed.
if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && isRepack)) {
if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && forceDisabledTape)) {
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
}
......@@ -485,6 +485,11 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_
}
}
void Helpers::flushRetrieveQueueStatisticsCache(){
threading::MutexLocker ml(g_retrieveQueueStatisticsMutex);
g_retrieveQueueStatistics.clear();
}
//------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatistics
//------------------------------------------------------------------------------
......
......@@ -78,7 +78,7 @@ public:
* to the algorithm, but will help performance drastically for a very similar result
*/
static std::string selectBestRetrieveQueue (const std::set<std::string> & candidateVids, cta::catalogue::Catalogue & catalogue,
objectstore::Backend & objectstore, bool isRepack = false);
objectstore::Backend & objectstore, bool forceDisabledTape = false);
/**
* Gets the retrieve queue statistics for a set of Vids (extracted from the OStoreDB
......@@ -95,6 +95,12 @@ public:
*/
static void updateRetrieveQueueStatisticsCache(const std::string & vid, uint64_t files, uint64_t bytes, uint64_t priority);
/**
* Allows to flush the RetrieveQueueStatisticsCache
* TO BE USED BY UNIT TESTS !
*/
static void flushRetrieveQueueStatisticsCache();
private:
/** Lock for the retrieve queues stats */
static cta::threading::Mutex g_retrieveQueueStatisticsMutex;
......
......@@ -78,6 +78,7 @@ void RepackRequest::initialize() {
m_payload.set_lastexpandedfseq(0);
m_payload.set_is_expand_finished(false);
m_payload.set_is_expand_started(false);
m_payload.set_force_disabled_tape(false);
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
......@@ -145,6 +146,7 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() {
ret.lastExpandedFseq = m_payload.lastexpandedfseq();
ret.userProvidedFiles = m_payload.userprovidedfiles();
ret.isExpandFinished = m_payload.is_expand_finished();
ret.forceDisabledTape = m_payload.force_disabled_tape();
if (m_payload.move_mode()) {
if (m_payload.add_copies_mode()) {
ret.type = RepackInfo::Type::MoveAndAddCopies;
......@@ -192,6 +194,15 @@ common::dataStructures::MountPolicy RepackRequest::getMountPolicy(){
return mpSerDeser;
}
void RepackRequest::setForceDisabledTape(const bool disabledTape){
checkPayloadWritable();
m_payload.set_force_disabled_tape(disabledTape);
}
bool RepackRequest::getForceDisabledTape() {
checkPayloadReadable();
return m_payload.force_disabled_tape();
}
void RepackRequest::setStatus(){
checkPayloadWritable();
checkPayloadReadable();
......@@ -622,6 +633,7 @@ RepackRequest::AsyncOwnerAndStatusUpdater* RepackRequest::asyncUpdateOwnerAndSta
retRef.m_repackInfo.status = (RepackInfo::Status) payload.status();
retRef.m_repackInfo.vid = payload.vid();
retRef.m_repackInfo.repackBufferBaseURL = payload.buffer_url();
retRef.m_repackInfo.forceDisabledTape = payload.force_disabled_tape();
if (payload.move_mode()) {
if (payload.add_copies_mode()) {
retRef.m_repackInfo.type = RepackInfo::Type::MoveAndAddCopies;
......
......@@ -50,6 +50,14 @@ public:
cta::SchedulerDatabase::RepackRequest::TotalStatsFiles getTotalStatsFile();
void setMountPolicy(const common::dataStructures::MountPolicy &mp);
common::dataStructures::MountPolicy getMountPolicy();
/**
* Set the flag disabledTape to allow the mounting of a
* disabled tape for file retrieval
* @param disabledTape if true, the disabled tape will be mounted for retrieval, if false, the
* tape will not be mounted if it is disabled
*/
void setForceDisabledTape(const bool disabledTape);
bool getForceDisabledTape();
/**
* Automatically set the new status of the Repack Request
......
......@@ -153,7 +153,7 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe
// filter on tape availability.
try {
// If we have to fetch the status of the tapes and queued for the non-disabled vids.
bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore,m_payload.isrepack());
bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore,m_payload.repack_info().force_disabled_tape());
goto queueForTransfer;
} catch (Helpers::NoTapeAvailableForRetrieve &) {}
queueForFailure:;
......@@ -625,6 +625,7 @@ void RetrieveRequest::setRepackInfo(const RepackInfo& repackInfo) {
for (auto cntr: repackInfo.copyNbsToRearchive) {
m_payload.mutable_repack_info()->mutable_copy_nbs_to_rearchive()->Add(cntr);
}
m_payload.mutable_repack_info()->set_force_disabled_tape(repackInfo.forceDisabledTape);
m_payload.mutable_repack_info()->set_file_buffer_url(repackInfo.fileBufferURL);
m_payload.mutable_repack_info()->set_repack_request_address(repackInfo.repackRequestAddress);
m_payload.mutable_repack_info()->set_fseq(repackInfo.fSeq);
......@@ -914,6 +915,7 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string
ri.isRepack = true;
ri.repackRequestAddress = payload.repack_info().repack_request_address();
ri.fSeq = payload.repack_info().fseq();
ri.forceDisabledTape = payload.repack_info().force_disabled_tape();
}
// TODO serialization of payload maybe not necessary
oh.set_payload(payload.SerializePartialAsString());
......
......@@ -151,6 +151,7 @@ public:
};
struct RepackInfo {
bool isRepack = false;
bool forceDisabledTape = false;
std::map<uint32_t, std::string> archiveRouteMap;
std::set<uint32_t> copyNbsToRearchive;
std::string repackRequestAddress;
......@@ -173,6 +174,7 @@ public:
rrri.set_file_buffer_url(fileBufferURL);
rrri.set_repack_request_address(repackRequestAddress);
rrri.set_fseq(fSeq);
rrri.set_force_disabled_tape(forceDisabledTape);
}
void deserialize(const cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) {
......@@ -182,6 +184,7 @@ public:
fileBufferURL = rrri.file_buffer_url();
repackRequestAddress = rrri.repack_request_address();
fSeq = rrri.fseq();
forceDisabledTape = rrri.force_disabled_tape();
}
};
private:
......
......@@ -318,7 +318,7 @@ std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequestInfosAcc
std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequestInfosAccessorInterface &requestAccessor, std::set<std::string>& candidateVids, log::LogContext &lc){
std::string vid;
try{
vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore,requestAccessor.getIsRepack());
vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore,requestAccessor.getForceDisabledTape());
} catch (Helpers::NoTapeAvailableForRetrieve & ex) {
log::ScopedParamContainer params(lc);
params.add("fileId", requestAccessor.getArchiveFile().archiveFileID);
......@@ -412,8 +412,8 @@ std::string OStoreRetrieveRequestAccessor::getRepackAddress(){
return m_retrieveRequest->getRepackInfo().repackRequestAddress;
}
bool OStoreRetrieveRequestAccessor::getIsRepack(){
return m_retrieveRequest->getRepackInfo().isRepack;
bool OStoreRetrieveRequestAccessor::getForceDisabledTape(){
return m_retrieveRequest->getRepackInfo().forceDisabledTape;
}
/* END OF RetrieveRequestAccessor CLASS */
......@@ -450,8 +450,8 @@ std::string SorterRetrieveRequestAccessor::getRepackAddress(){
return m_retrieveRequest.repackRequestAddress;
}
bool SorterRetrieveRequestAccessor::getIsRepack(){
return m_retrieveRequest.isRepack;
bool SorterRetrieveRequestAccessor::getForceDisabledTape(){
return m_retrieveRequest.forceDisabledTape;
}
/* END OF SorterRetrieveRequestAccessor CLASS*/
......
......@@ -117,7 +117,7 @@ public:
common::dataStructures::ArchiveFile archiveFile;
std::map<uint32_t, RetrieveJob> retrieveJobs;
std::string repackRequestAddress;
bool isRepack = false;
bool forceDisabledTape = false;
};
/* Retrieve-related methods */
......@@ -230,7 +230,7 @@ class RetrieveRequestInfosAccessorInterface{
virtual ~RetrieveRequestInfosAccessorInterface();
virtual serializers::RetrieveJobStatus getJobStatus(const uint32_t copyNb) = 0;
virtual std::string getRepackAddress() = 0;
virtual bool getIsRepack() = 0;
virtual bool getForceDisabledTape() = 0;
};
class OStoreRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterface{
......@@ -243,7 +243,7 @@ class OStoreRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterfac
const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner);
serializers::RetrieveJobStatus getJobStatus(const uint32_t copyNb);
std::string getRepackAddress();
bool getIsRepack();
bool getForceDisabledTape();
private:
std::shared_ptr<RetrieveRequest> m_retrieveRequest;
};
......@@ -258,7 +258,7 @@ class SorterRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterfac
const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner);
serializers::RetrieveJobStatus getJobStatus(const uint32_t copyNb);
std::string getRepackAddress();
bool getIsRepack();
bool getForceDisabledTape();
private:
Sorter::SorterRetrieveRequest& m_retrieveRequest;
};
......
......@@ -409,6 +409,7 @@ message RetrieveRequestRepackInfo {
required string repack_request_address = 9520;
required string file_buffer_url = 9530;
required uint64 fseq = 9540;
required bool force_disabled_tape = 9560;
}
// The different timings of the lifecycle of a RetrieveRequest (creation time, first select time, request complete)
......@@ -589,6 +590,7 @@ message RepackRequest {
required bool is_expand_finished = 11561;
required bool is_expand_started = 11562;
required MountPolicy mount_policy = 11563;
required bool force_disabled_tape = 11564;
repeated RepackSubRequestPointer subrequests = 11570;
}
......
......@@ -1401,7 +1401,7 @@ OStoreDB::RetrieveQueueItor_t* OStoreDB::getRetrieveJobItorPtr(const std::string
// OStoreDB::queueRepack()
//------------------------------------------------------------------------------
void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL,
common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy& mountPolicy, log::LogContext & lc) {
common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy& mountPolicy, const bool forceDisabledTape,log::LogContext & lc) {
// Prepare the repack request object in memory.
assertAgentAddressSet();
cta::utils::Timer t;
......@@ -1413,6 +1413,7 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL,
rr->setType(repackType);
rr->setBufferURL(bufferURL);
rr->setMountPolicy(mountPolicy);
rr->setForceDisabledTape(forceDisabledTape);
// Try to reference the object in the index (will fail if there is already a request with this VID.
try {
Helpers::registerRepackRequestToIndex(vid, rr->getAddressIfSet(), *m_agentReference, m_objectStore, lc);
......@@ -1635,6 +1636,7 @@ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRepackJobToEx
ret->repackInfo.type = repackInfo.type;
ret->repackInfo.status = repackInfo.status;
ret->repackInfo.repackBufferBaseURL = repackInfo.repackBufferBaseURL;
ret->repackInfo.forceDisabledTape = repackInfo.forceDisabledTape;
return std::move(ret);
}
}
......@@ -2196,6 +2198,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
m_repackRequest.setTotalStats(totalStatsFiles);
uint64_t fSeq = std::max(maxFSeqLowBound + 1, maxAddedFSeq + 1);
common::dataStructures::MountPolicy mountPolicy = m_repackRequest.getMountPolicy();
bool forceDisabledTape = m_repackRequest.getInfo().forceDisabledTape;
// We make sure the references to subrequests exist persistently before creating them.
m_repackRequest.commit();
// We keep holding the repack request lock: we need to ensure de deleted boolean of each subrequest does
......@@ -2260,6 +2263,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
rRRepackInfo.fileBufferURL = rsr.fileBufferURL;
rRRepackInfo.fSeq = rsr.fSeq;
rRRepackInfo.isRepack = true;
rRRepackInfo.forceDisabledTape = forceDisabledTape;
rRRepackInfo.repackRequestAddress = m_repackRequest.getAddressIfSet();
rr->setRepackInfo(rRRepackInfo);
// Set the queueing parameters
......@@ -2276,7 +2280,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
if (tc.vid == repackInfo.vid) {
try {
// Try to select the repack VID from a one-vid list.
Helpers::selectBestRetrieveQueue({repackInfo.vid}, m_oStoreDB.m_catalogue, m_oStoreDB.m_objectStore,true);
Helpers::selectBestRetrieveQueue({repackInfo.vid}, m_oStoreDB.m_catalogue, m_oStoreDB.m_objectStore,repackInfo.forceDisabledTape);
bestVid = repackInfo.vid;
activeCopyNumber = tc.copyNb;
} catch (Helpers::NoTapeAvailableForRetrieve &) {}
......@@ -2288,7 +2292,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
std::set<std::string> candidateVids;
for (auto & tc: rsr.archiveFile.tapeFiles) candidateVids.insert(tc.vid);
try {
bestVid = Helpers::selectBestRetrieveQueue(candidateVids, m_oStoreDB.m_catalogue, m_oStoreDB.m_objectStore,true);
bestVid = Helpers::selectBestRetrieveQueue(candidateVids, m_oStoreDB.m_catalogue, m_oStoreDB.m_objectStore,forceDisabledTape);
} catch (Helpers::NoTapeAvailableForRetrieve &) {
// Count the failure for this subrequest.
notCreatedSubrequests.emplace_back(rsr);
......@@ -4591,13 +4595,13 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log::
"In OStoreDB::RetrieveJob::failTransfer(): no active job after addJobFailure() returned false."
);
}
bool isRepack = m_retrieveRequest.getRepackInfo().isRepack;
bool disabledTape = m_retrieveRequest.getRepackInfo().forceDisabledTape;
m_retrieveRequest.commit();
rel.release();
// Check that the requested retrieve job (for the provided VID) exists, and record the copy number
std::string bestVid = Helpers::selectBestRetrieveQueue(candidateVids, m_oStoreDB.m_catalogue,
m_oStoreDB.m_objectStore,isRepack);
m_oStoreDB.m_objectStore,disabledTape);
auto tf_it = af.tapeFiles.begin();
for( ; tf_it != af.tapeFiles.end() && tf_it->vid != bestVid; ++tf_it) ;
......
......@@ -331,7 +331,7 @@ public:
/* === Repack requests handling =========================================== */
void queueRepack(const std::string& vid, const std::string& bufferURL,
common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext &logContext) override;
common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, const bool forceDisabledTape, log::LogContext &logContext) override;
std::list<common::dataStructures::RepackInfo> getRepackInfo() override;
CTA_GENERATE_EXCEPTION_CLASS(NoSuchRepackRequest);
......
......@@ -68,7 +68,7 @@ namespace {
template <class BackendType>
class OStoreDBWrapper: public cta::objectstore::OStoreDBWrapperInterface {
public:
OStoreDBWrapper(const std::string &context, const std::string &URL = "");
OStoreDBWrapper(const std::string &context, std::unique_ptr<cta::catalogue::Catalogue>& catalogue, const std::string &URL = "");
~OStoreDBWrapper() throw () {}
......@@ -224,8 +224,8 @@ public:
}
void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext& lc) override {
m_OStoreDB.queueRepack(vid, bufferURL, repackType, mountPolicy, lc);
void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, const bool forceDisabledTape, log::LogContext& lc) override {
m_OStoreDB.queueRepack(vid, bufferURL, repackType, mountPolicy, forceDisabledTape, lc);
}
std::list<common::dataStructures::RepackInfo> getRepackInfo() override {
......@@ -278,16 +278,16 @@ public:
private:
std::unique_ptr <cta::log::Logger> m_logger;
std::unique_ptr <cta::objectstore::Backend> m_backend;
std::unique_ptr <cta::catalogue::Catalogue> m_catalogue;
std::unique_ptr <cta::catalogue::Catalogue> & m_catalogue;
cta::OStoreDB m_OStoreDB;
std::unique_ptr<objectstore::AgentReference> m_agentReferencePtr;
};
template <>
OStoreDBWrapper<cta::objectstore::BackendVFS>::OStoreDBWrapper(
const std::string &context, const std::string &URL) :
const std::string &context, std::unique_ptr<cta::catalogue::Catalogue> & catalogue, const std::string &URL) :
m_logger(new cta::log::DummyLogger("", "")), m_backend(new cta::objectstore::BackendVFS()),
m_catalogue(new cta::catalogue::DummyCatalogue),
m_catalogue(catalogue),
m_OStoreDB(*m_backend, *m_catalogue, *m_logger),
m_agentReferencePtr(new objectstore::AgentReference("OStoreDBFactory", *m_logger))
{
......@@ -314,9 +314,9 @@ m_OStoreDB(*m_backend, *m_catalogue, *m_logger),
template <>
OStoreDBWrapper<cta::objectstore::BackendRados>::OStoreDBWrapper(
const std::string &context, const std::string &URL) :
const std::string &context,std::unique_ptr<cta::catalogue::Catalogue> & catalogue, const std::string &URL) :
m_logger(new cta::log::DummyLogger("", "")), m_backend(cta::objectstore::BackendFactory::createBackend(URL, *m_logger).release()),
m_catalogue(new cta::catalogue::DummyCatalogue),
m_catalogue(catalogue),
m_OStoreDB(*m_backend, *m_catalogue, *m_logger),
m_agentReferencePtr(new objectstore::AgentReference("OStoreDBFactory", *m_logger))
{
......@@ -372,8 +372,8 @@ public:
*
* @return A newly created scheduler database object.
*/
std::unique_ptr<SchedulerDatabase> create() const {
return std::unique_ptr<SchedulerDatabase>(new OStoreDBWrapper<BackendType>("UnitTest", m_URL));
std::unique_ptr<SchedulerDatabase> create(std::unique_ptr<cta::catalogue::Catalogue>& catalogue) const {
return std::unique_ptr<SchedulerDatabase>(new OStoreDBWrapper<BackendType>("UnitTest", catalogue, m_URL));
}
private:
......
......@@ -27,6 +27,7 @@
#include "OStoreDB.hpp"
#include "objectstore/BackendRadosTestSwitch.hpp"
#include "MemQueues.hpp"
#include "catalogue/InMemoryCatalogue.hpp"
namespace unitTests {
......@@ -63,8 +64,9 @@ public:
// We do a deep reference to the member as the C++ compiler requires the function to be
// already defined if called implicitly.
const auto &factory = GetParam().dbFactory;
m_catalogue = cta::make_unique<cta::catalogue::DummyCatalogue>(new cta::catalogue::DummyCatalogue);
// Get the OStore DB from the factory.
auto osdb = std::move(factory.create());
auto osdb = std::move(factory.create(m_catalogue));
// Make sure the type of the SchedulerDatabase is correct (it should be an OStoreDBWrapperInterface).
dynamic_cast<cta::objectstore::OStoreDBWrapperInterface *> (osdb.get());
// We know the cast will not fail, so we can safely do it (otherwise we could leak memory).
......@@ -108,7 +110,8 @@ private:
OStoreDBTest & operator= (const OStoreDBTest &) = delete;
std::unique_ptr<cta::objectstore::OStoreDBWrapperInterface> m_db;
std::unique_ptr<cta::catalogue::Catalogue> m_catalogue;
}; // class SchedulerDatabaseTest
TEST_P(OStoreDBTest, getBatchArchiveJob) {
......
......@@ -337,18 +337,19 @@ void Scheduler::checkTapeFullBeforeRepack(std::string vid){
// repack
//------------------------------------------------------------------------------
void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid,
const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc) {
const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, const bool forceDisabledTape, log::LogContext & lc) {
// Check request sanity
if (vid.empty()) throw exception::UserError("Empty VID name.");
if (bufferURL.empty()) throw exception::UserError("Empty buffer URL.");
utils::Timer t;
checkTapeFullBeforeRepack(vid);
m_db.queueRepack(vid, bufferURL, repackType, mountPolicy, lc);
m_db.queueRepack(vid, bufferURL, repackType, mountPolicy, forceDisabledTape, lc);
log::TimingList tl;
tl.insertAndReset("schedulerDbTime", t);
log::ScopedParamContainer params(lc);
params.add("tapeVid", vid)
.add("repackType", toString(repackType))
.add("disabledTape", forceDisabledTape)
.add("bufferURL", bufferURL);
tl.addToLog(params);
lc.log(log::INFO, "In Scheduler::queueRepack(): success.");
......@@ -446,7 +447,6 @@ double Scheduler::getRepackRequestExpansionTimeLimit() const {
// expandRepackRequest
//------------------------------------------------------------------------------
void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList& timingList, utils::Timer& t, log::LogContext& lc) {
std::list<common::dataStructures::ArchiveFile> files;
auto repackInfo = repackRequest->getRepackInfo();
typedef cta::common::dataStructures::RepackInfo::Type RepackType;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment