Commit 32526d3b authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Changed the flush policy of successful Retrieve Requests (every 2000 files or every 3 minutes)

Added mount policy to the Repack Request
parent c4144c1f
......@@ -488,7 +488,7 @@ const std::map<cmd_key_t, cmd_val_t> cmdOptions = {
{{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS }, { }},
/*----------------------------------------------------------------------------------------------------*/
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD },
{ opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional() }},
{ opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional(), opt_mountpolicy.optional() }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM }, { opt_vid }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS }, { opt_vid.optional() }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR }, { opt_vid }},
......
......@@ -720,6 +720,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestPending) {
repackRequest.setVid("VIDTest");
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack);
repackRequest.insert();
}
{
......@@ -800,6 +801,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestToExpand) {
repackRequest.setVid("VID2Test");
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack);
repackRequest.insert();
}
{
......@@ -880,6 +882,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandNotFinished) {
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setExpandFinished(false);
repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack);
repackRequest.insert();
}
{
......@@ -961,6 +964,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandFinished) {
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setExpandFinished(true);
repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack);
repackRequest.insert();
}
cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG);
......@@ -1059,6 +1063,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestStarting) {
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setExpandFinished(true);
repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack);
repackRequest.insert();
}
cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG);
......
......@@ -21,6 +21,7 @@
#include "AgentReference.hpp"
#include "RepackQueueAlgorithms.hpp"
#include "Algorithms.hpp"
#include "MountPolicySerDeser.hpp"
#include <google/protobuf/util/json_util.h>
#include <iostream>
......@@ -177,6 +178,19 @@ void RepackRequest::setTotalStats(const cta::SchedulerDatabase::RepackRequest::T
setTotalBytesToRetrieve(totalStatsFiles.totalBytesToRetrieve);
}
void RepackRequest::setMountPolicy(const common::dataStructures::MountPolicy& mp){
checkPayloadWritable();
MountPolicySerDeser mpSerDeser(mp);
mpSerDeser.serialize(*m_payload.mutable_mount_policy());
}
common::dataStructures::MountPolicy RepackRequest::getMountPolicy(){
checkPayloadReadable();
MountPolicySerDeser mpSerDeser;
mpSerDeser.deserialize(m_payload.mount_policy());
return mpSerDeser;
}
void RepackRequest::setStatus(){
checkPayloadWritable();
checkPayloadReadable();
......
......@@ -48,6 +48,9 @@ public:
void setExpandStarted(const bool expandStarted);
void setTotalStats(const cta::SchedulerDatabase::RepackRequest::TotalStatsFiles& totalStatsFiles);
cta::SchedulerDatabase::RepackRequest::TotalStatsFiles getTotalStatsFile();
void setMountPolicy(const common::dataStructures::MountPolicy &mp);
common::dataStructures::MountPolicy getMountPolicy();
/**
* Automatically set the new status of the Repack Request
* regarding multiple parameters
......
......@@ -1052,7 +1052,7 @@ void RetrieveRequest::AsyncJobDeleter::wait() {
RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReportSucceedForRepack(uint32_t copyNb)
{
std::unique_ptr<AsyncJobSucceedForRepackReporter> ret(new AsyncJobSucceedForRepackReporter);
ret->m_updaterCallback = [copyNb](const std::string &in)->std::string{
ret->m_updaterCallback = [&ret,copyNb](const std::string &in)->std::string{
// We have a locked and fetched object, so we just need to work on its representation.
cta::objectstore::serializers::ObjectHeader oh;
if (!oh.ParseFromString(in)) {
......@@ -1085,6 +1085,7 @@ RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReport
return oh.SerializeAsString();
}
}
ret->m_MountPolicy.deserialize(payload.mountpolicy());
throw cta::exception::Exception("In RetrieveRequest::asyncReportSucceedForRepack::lambda(): copyNb not found");
};
ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(),ret->m_updaterCallback));
......
......@@ -33,6 +33,7 @@
#include "common/dataStructures/LifecycleTimings.hpp"
#include "AgentReference.hpp"
#include "SorterArchiveJob.hpp"
#include "MountPolicySerDeser.hpp"
namespace cta {
namespace objectstore {
......@@ -77,6 +78,7 @@ public:
* Wait for the end of the execution of the updater callback
*/
void wait();
MountPolicySerDeser m_MountPolicy;
private:
//Hold the AsyncUpdater that will run asynchronously the m_updaterCallback
std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater;
......
......@@ -576,6 +576,7 @@ message RepackRequest {
//the expansion of the RepackRequest is done or not
required bool is_expand_finished = 11561;
required bool is_expand_started = 11562;
required MountPolicy mount_policy = 11563;
repeated RepackSubRequestPointer subrequests = 11570;
}
......
......@@ -1401,7 +1401,7 @@ OStoreDB::RetrieveQueueItor_t* OStoreDB::getRetrieveJobItorPtr(const std::string
// OStoreDB::queueRepack()
//------------------------------------------------------------------------------
void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL,
common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) {
common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy& mountPolicy, log::LogContext & lc) {
// Prepare the repack request object in memory.
assertAgentAddressSet();
cta::utils::Timer t;
......@@ -1412,6 +1412,7 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL,
rr->setVid(vid);
rr->setType(repackType);
rr->setBufferURL(bufferURL);
rr->setMountPolicy(mountPolicy);
// Try to reference the object in the index (will fail if there is already a request with this VID.
try {
Helpers::registerRepackRequestToIndex(vid, rr->getAddressIfSet(), *m_agentReference, m_objectStore, lc);
......@@ -1863,7 +1864,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
// As usual there are many opportunities for failure.
utils::Timer t;
log::TimingList timingList;
cta::common::dataStructures::MountPolicy mountPolicy;
// 1) Update statistics. As the repack request is protected against double reporting, we can release its lock
// before the next step.
{
......@@ -1880,6 +1881,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
objectstore::ScopedExclusiveLock rrl(m_repackRequest);
timingList.insertAndReset("successStatsLockTime", t);
m_repackRequest.fetch();
mountPolicy = m_repackRequest.getMountPolicy();
timingList.insertAndReset("successStatsFetchTime", t);
m_repackRequest.reportRetriveSuccesses(ssl);
timingList.insertAndReset("successStatsUpdateTime", t);
......@@ -1956,7 +1958,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
sorterArchiveJob.jobDump.copyNb = copyNbToArchive;
sorterArchiveJob.jobDump.tapePool = atar.subrequestInfo.repackInfo.archiveRouteMap[copyNbToArchive];
sorterArchiveJob.jobQueueType = cta::objectstore::JobQueueType::JobsToTransferForRepack;
sorterArchiveJob.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack;
sorterArchiveJob.mountPolicy = mountPolicy;
sorterArchiveJob.previousOwner = atar.subrequestInfo.owner;
sorterArchiveRequest.archiveJobs.push_back(sorterArchiveJob);
}
......@@ -2187,6 +2189,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
m_repackRequest.setTotalStats(totalStatsFiles);
uint64_t fSeq = std::max(maxFSeqLowBound + 1, maxAddedFSeq + 1);
m_repackRequest.setLastExpandedFSeq(fSeq);
common::dataStructures::MountPolicy mountPolicy = m_repackRequest.getMountPolicy();
// We make sure the references to subrequests exist persistently before creating them.
m_repackRequest.commit();
// We keep holding the repack request lock: we need to ensure de deleted boolean of each subrequest does
......@@ -2258,7 +2261,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
// Set the queueing parameters
common::dataStructures::RetrieveFileQueueCriteria fileQueueCriteria;
fileQueueCriteria.archiveFile = rsr.archiveFile;
fileQueueCriteria.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack;
fileQueueCriteria.mountPolicy = mountPolicy;
rr->setRetrieveFileQueueCriteria(fileQueueCriteria);
// Decide of which vid we are going to retrieve from. Here, if we can retrieve from the repack VID, we
// will set the initial recall on it. Retries will we requeue to best VID as usual if needed.
......@@ -3579,11 +3582,13 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
// We will wait on the asynchronously started reports of jobs, queue the retrieve jobs
// for report and remove them from ownership.
// 1) Check the async update result.
common::dataStructures::MountPolicy mountPolicy;
for (auto & sDBJob: jobsBatch) {
auto osdbJob = castFromSchedDBJob(sDBJob);
if (osdbJob->isRepack) {
try {
osdbJob->m_jobSucceedForRepackReporter->wait();
mountPolicy = osdbJob->m_jobSucceedForRepackReporter->m_MountPolicy;
jobsToRequeueForRepackMap[osdbJob->m_repackInfo.repackRequestAddress].emplace_back(osdbJob);
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params(lc);
......@@ -3630,7 +3635,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
for (auto & req: repackRequestQueue.second) {
insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb,
req->archiveFile.tapeFiles.at(req->selectedCopyNb).fSeq, req->archiveFile.fileSize,
cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,
mountPolicy,
serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription});
requestToJobMap[&req->m_retrieveRequest] = req;
}
......
......@@ -331,7 +331,7 @@ public:
/* === Repack requests handling =========================================== */
void queueRepack(const std::string& vid, const std::string& bufferURL,
common::dataStructures::RepackInfo::Type repackType, log::LogContext &logContext) override;
common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext &logContext) override;
std::list<common::dataStructures::RepackInfo> getRepackInfo() override;
CTA_GENERATE_EXCEPTION_CLASS(NoSuchRepackRequest);
......
......@@ -224,8 +224,8 @@ public:
}
void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, log::LogContext& lc) override {
m_OStoreDB.queueRepack(vid, bufferURL, repackType, lc);
void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext& lc) override {
m_OStoreDB.queueRepack(vid, bufferURL, repackType, mountPolicy, lc);
}
std::list<common::dataStructures::RepackInfo> getRepackInfo() override {
......
......@@ -337,13 +337,13 @@ void Scheduler::checkTapeFullBeforeRepack(std::string vid){
// repack
//------------------------------------------------------------------------------
void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid,
const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) {
const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc) {
// Check request sanity
if (vid.empty()) throw exception::UserError("Empty VID name.");
if (bufferURL.empty()) throw exception::UserError("Empty buffer URL.");
utils::Timer t;
checkTapeFullBeforeRepack(vid);
m_db.queueRepack(vid, bufferURL, repackType, lc);
m_db.queueRepack(vid, bufferURL, repackType, mountPolicy, lc);
log::TimingList tl;
tl.insertAndReset("schedulerDbTime", t);
log::ScopedParamContainer params(lc);
......
......@@ -200,7 +200,7 @@ public:
const bool force);
void queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid,
const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc);
const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc);
void cancelRepack(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, log::LogContext & lc);
std::list<cta::common::dataStructures::RepackInfo> getRepacks();
cta::common::dataStructures::RepackInfo getRepack(const std::string &vid);
......
......@@ -388,7 +388,7 @@ public:
/*============ Repack management: user side ================================*/
virtual void queueRepack(const std::string & vid, const std::string & bufferURL,
common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) = 0;
common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc) = 0;
virtual std::list<common::dataStructures::RepackInfo> getRepackInfo() = 0;
virtual common::dataStructures::RepackInfo getRepackInfo(const std::string & vid) = 0;
virtual void cancelRepack(const std::string & vid, log::LogContext & lc) = 0;
......
......@@ -1275,14 +1275,14 @@ TEST_P(SchedulerTest, repack) {
catalogue.createTape(cliId,tape1,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,false, notReadOnly, "Comment");
//The queueing of a repack request should fail if the tape to repack is not full
ASSERT_THROW(scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc),cta::exception::UserError);
ASSERT_THROW(scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,lc),cta::exception::UserError);
//The queueing of a repack request in a vid that does not exist should throw an exception
ASSERT_THROW(scheduler.queueRepack(cliId, "NOT_EXIST", "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc),cta::exception::UserError);
ASSERT_THROW(scheduler.queueRepack(cliId, "NOT_EXIST", "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc),cta::exception::UserError);
catalogue.setTapeFull(cliId,tape1,true);
// Create and then cancel repack
scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc);
scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
{
auto repacks = scheduler.getRepacks();
ASSERT_EQ(1, repacks.size());
......@@ -1294,7 +1294,7 @@ TEST_P(SchedulerTest, repack) {
// Recreate a repack and get it moved to ToExpand
std::string tape2 = "Tape2";
catalogue.createTape(cliId,tape2,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment");
scheduler.queueRepack(cliId, tape2, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc);
scheduler.queueRepack(cliId, tape2, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
{
auto repacks = scheduler.getRepacks();
ASSERT_EQ(1, repacks.size());
......@@ -1335,13 +1335,13 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) {
catalogue.createTape(cliId,tape1,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment");
//Queue the first repack request
scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc);
scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
std::string tape2 = "Tape2";
catalogue.createTape(cliId,tape2,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment");
//Queue the second repack request
scheduler.queueRepack(cliId,tape2,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,lc);
scheduler.queueRepack(cliId,tape2,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
//Test the repack request queued has status Pending
ASSERT_EQ(scheduler.getRepack(tape1).status,common::dataStructures::RepackInfo::Status::Pending);
......@@ -1472,7 +1472,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
for(uint64_t i = 0; i < nbTapesToRepack ; ++i) {
scheduler.queueRepack(admin,allVid.at(i),"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,allVid.at(i),"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
}
scheduler.waitSchedulerDbSubthreadsComplete();
//scheduler.waitSchedulerDbSubthreadsComplete();
......@@ -1789,7 +1789,7 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
scheduler.waitSchedulerDbSubthreadsComplete();
log::TimingList tl;
......@@ -2028,7 +2028,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
scheduler.waitSchedulerDbSubthreadsComplete();
//scheduler.waitSchedulerDbSubthreadsComplete();
......@@ -2274,7 +2274,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
scheduler.waitSchedulerDbSubthreadsComplete();
log::TimingList tl;
......@@ -2568,7 +2568,7 @@ TEST_P(SchedulerTest, expandRepackRequestExpansionTimeLimitReached) {
//one retrieve request
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc);
scheduler.waitSchedulerDbSubthreadsComplete();
log::TimingList tl;
......
......@@ -1805,7 +1805,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) {
// We need to create the drive in the registry before being able to put it up.
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext);
scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext);
// Create the data transfer session
DataTransferConfig castorConf;
castorConf.bufsz = 1024*1024; // 1 MB memory buffers
......
......@@ -238,7 +238,7 @@ private:
/*
* The limit for successful reports to trigger flush.
*/
const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 500;
const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 2000;
/*
* The time limit for successful reports to trigger flush.
......
......@@ -1083,6 +1083,28 @@ void RequestMessage::processRepack_Add(cta::xrd::Response &response)
throw cta::exception::UserError("Must specify the buffer URL using --bufferurl option or using the frontend configuration file.");
}
}
typedef common::dataStructures::MountPolicy MountPolicy;
MountPolicy mountPolicy = MountPolicy::s_defaultMountPolicyForRepack;
auto mountPolicyProvidedByUserOpt = getOptional(OptionString::MOUNT_POLICY);
if(mountPolicyProvidedByUserOpt){
//The user specified a mount policy name for this repack request
std::string mountPolicyProvidedByUser = mountPolicyProvidedByUserOpt.value();
//Get the mountpolicy from the catalogue
typedef std::list<common::dataStructures::MountPolicy> MountPolicyList;
MountPolicyList mountPolicies = m_catalogue.getMountPolicies();
MountPolicyList::const_iterator repackMountPolicyItor = std::find_if(mountPolicies.begin(),mountPolicies.end(),[mountPolicyProvidedByUser](const common::dataStructures::MountPolicy & mp){
return mp.name == mountPolicyProvidedByUser;
});
if(repackMountPolicyItor != mountPolicies.end()){
//The mount policy exists
mountPolicy = *repackMountPolicyItor;
} else {
//The mount policy does not exist, throw a user error
throw cta::exception::UserError("The mount policy name provided does not match any existing mount policy.");
}
}
// Expand, repack, or both ?
cta::common::dataStructures::RepackInfo::Type type;
......@@ -1099,7 +1121,7 @@ void RequestMessage::processRepack_Add(cta::xrd::Response &response)
// Process each item in the list
for(auto it = vid_list.begin(); it != vid_list.end(); ++it) {
m_scheduler.queueRepack(m_cliIdentity, *it, bufferURL, type, m_lc);
m_scheduler.queueRepack(m_cliIdentity, *it, bufferURL, type, mountPolicy , m_lc);
}
response.set_type(cta::xrd::Response::RSP_SUCCESS);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment