Commit 5e687c25 authored by Cedric Caffy's avatar Cedric Caffy
Browse files

Added support for future insertion of the ArchiveRequest's objectstore

address on EOS
parent 633a28d2
......@@ -81,7 +81,7 @@ echo "Preparing CTA configuration for tests"
cta-admin tape rm -v {}
kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin --json archiveroute ls | \
jq '.[] | "-i " + .instance + " -s " + .storageClass + " -c " + (.copyNumber|tostring)' | \
jq '.[] | " -s " + .storageClass + " -c " + (.copyNumber|tostring)' | \
xargs -I{} bash -c "kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin archiveroute rm {}"
kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin --json tapepool ls | \
......@@ -89,10 +89,12 @@ echo "Preparing CTA configuration for tests"
xargs -I{} kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin tapepool rm -n {}
kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin --json storageclass ls | \
jq -r '.[] | "-i " + .diskInstance + " -n " + .name' | \
jq -r '.[] | " -n " + .name' | \
xargs -I{} bash -c "kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin storageclass rm {}"
kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin --json vo ls | \
jq -r '.[] | " --vo " + .name' | \
xargs -I{} bash -c "kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin vo rm {}"
for ((i=0; i<${#TAPEDRIVES_IN_USE[@]}; i++)); do
......
......@@ -609,7 +609,7 @@ OStoreDB::TapeMountDecisionInfoNoLock::~TapeMountDecisionInfoNoLock() {}
//------------------------------------------------------------------------------
// OStoreDB::queueArchive()
//------------------------------------------------------------------------------
void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
std::string OStoreDB::queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) {
assertAgentAddressSet();
cta::utils::Timer timer;
......@@ -656,6 +656,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
m_agentReference->addToOwnership(aReq->getAddressIfSet(), m_objectStore);
double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
aReq->insert();
std::string archiveRequestAddr = aReq->getAddressIfSet();
double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// The request is now safe in the object store. We can now return to the caller and fire (and forget) a thread
// complete the bottom half of it.
......@@ -771,6 +772,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
.add("taskQueueSize", taskQueueSize)
.add("totalTime", agentReferencingTime + insertionTime + taskPostingTime);
logContext.log(log::INFO, "In OStoreDB::queueArchive(): recorded request for queueing (enqueueing posted to thread pool).");
return archiveRequestAddr;
}
void OStoreDB::queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& logContext){
......@@ -1439,6 +1441,10 @@ void OStoreDB::deleteRetrieveRequest(const common::dataStructures::SecurityIdent
throw exception::Exception("Not Implemented");
}
void OStoreDB::deleteArchiveRequest(const common::dataStructures::SecurityIdentity& cliIdentity){
}
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveJobs()
//------------------------------------------------------------------------------
......
......@@ -284,7 +284,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyCompleteOrCanceled);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveQueue);
void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
std::string queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) override;
void queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& logContext) override;
......@@ -328,6 +328,8 @@ public:
void deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& requester,
const std::string& remoteFile) override;
void deleteArchiveRequest(const common::dataStructures::SecurityIdentity & cliIdentity) override;
std::list<cta::common::dataStructures::RetrieveJob> getRetrieveJobs(const std::string &vid) const override;
......
......@@ -106,7 +106,7 @@ public:
m_OStoreDB.ping();
}
void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest& request, const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId& criteria, log::LogContext &logContext) override {
std::string queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest& request, const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId& criteria, log::LogContext &logContext) override {
return m_OStoreDB.queueArchive(instanceName, request, criteria, logContext);
}
......@@ -118,6 +118,10 @@ public:
m_OStoreDB.deleteRetrieveRequest(cliIdentity, remoteFile);
}
void deleteArchiveRequest(const common::dataStructures::SecurityIdentity& cliIdentity) override{
}
std::list<cta::common::dataStructures::RetrieveJob> getRetrieveJobs(const std::string& tapePoolName) const override {
return m_OStoreDB.getRetrieveJobs(tapePoolName);
}
......
......@@ -125,7 +125,7 @@ uint64_t Scheduler::checkAndGetNextArchiveFileId(const std::string &instanceName
//------------------------------------------------------------------------------
// queueArchiveWithGivenId
//------------------------------------------------------------------------------
void Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
std::string Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
const cta::common::dataStructures::ArchiveRequest &request, log::LogContext &lc) {
cta::utils::Timer t;
using utils::postEllipsis;
......@@ -141,7 +141,7 @@ void Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std:
const common::dataStructures::ArchiveFileQueueCriteriaAndFileId catalogueInfo(archiveFileId,
queueCriteria.copyToPoolMap, queueCriteria.mountPolicy);
m_db.queueArchive(instanceName, request, catalogueInfo, lc);
std::string archiveReqAddr = m_db.queueArchive(instanceName, request, catalogueInfo, lc);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("instanceName", instanceName)
......@@ -173,30 +173,7 @@ void Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std:
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "Queued archive request");
}
void Scheduler::queueArchiveRequestForRepackBatch(std::list<cta::objectstore::ArchiveRequest> &archiveRequests,log::LogContext &lc)
{
for(auto& archiveRequest : archiveRequests){
objectstore::ScopedExclusiveLock rReqL(archiveRequest);
archiveRequest.fetch();
cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile();
rReqL.release();
std::unique_ptr<cta::objectstore::ArchiveRequest> arUniqPtr = cta::make_unique<cta::objectstore::ArchiveRequest>(archiveRequest);
this->m_db.queueArchiveForRepack(std::move(arUniqPtr),lc);
cta::log::TimingList tl;
utils::Timer t;
tl.insOrIncAndReset("schedulerDbTime", t);
log::ScopedParamContainer spc(lc);
spc.add("instanceName", archiveFile.diskInstance)
.add("storageClass", archiveFile.storageClass)
.add("diskFileID", archiveFile.diskFileId)
.add("fileSize", archiveFile.fileSize)
.add("fileId", archiveFile.archiveFileID);
tl.insertOrIncrement("schedulerDbTime",t.secs());
tl.addToLog(spc);
lc.log(log::INFO,"Queued repack archive request");
}
return archiveReqAddr;
}
//------------------------------------------------------------------------------
......@@ -268,7 +245,7 @@ std::string Scheduler::queueRetrieve(
// deleteArchive
//------------------------------------------------------------------------------
void Scheduler::deleteArchive(const std::string &instanceName, const common::dataStructures::DeleteArchiveRequest &request,
log::LogContext & lc) {
log::LogContext & lc) {
// We have different possible scenarios here. The file can be safe in the catalogue,
// fully queued, or partially queued.
// First, make sure the file is not queued anymore.
......
......@@ -137,16 +137,9 @@ public:
* @param lc a log context allowing logging from within the scheduler routine.
* @return
*/
void queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
std::string queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
const cta::common::dataStructures::ArchiveRequest &request, log::LogContext &lc);
/**
* Queue the ArchiveRequests that have been transformed from Repack RetrieveRequests
* @param archiveRequests the list of the ArchiveRequests to queue into the ArchiveQueueToTransferForRepack queue.
* @param lc a log context allowing logging from within the scheduler routine.
*/
void queueArchiveRequestForRepackBatch(std::list<cta::objectstore::ArchiveRequest> &archiveRequests,log::LogContext &lc);
/**
* Queue a retrieve request.
* Throws a UserError exception in case of wrong request parameters (ex. unknown file id)
......
......@@ -116,8 +116,9 @@ public:
* @param criteria The criteria retrieved from the CTA catalogue to be used to
* decide how to queue the request.
* @param logContext context allowing logging db operation
* @returns the objectstore ArchiveRequest address
*/
virtual void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
virtual std::string queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria,
log::LogContext &logContext) = 0;
......@@ -327,6 +328,8 @@ public:
const common::dataStructures::SecurityIdentity &cliIdentity,
const std::string &remoteFile) = 0;
virtual void deleteArchiveRequest(const common::dataStructures::SecurityIdentity & cliIdentity) = 0;
/**
* Returns all of the queued archive jobs. The returned jobs are
* grouped by tape pool and then sorted by creation time, oldest first.
......
......@@ -449,16 +449,19 @@ void RequestMessage::processCLOSEW(const cta::eos::Notification &notification, c
cta::utils::Timer t;
// Queue the request
m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc);
// Create a log entry
cta::log::ScopedParamContainer params(m_lc);
params.add("fileId", archiveFileId);
params.add("schedulerTime", t.secs());
params.add("requesterInstance", notification.wf().requester_instance());
params.add("archiveRequestId",archiveRequestAddr);
m_lc.log(cta::log::INFO, "In RequestMessage::processCLOSEW(): queued file for archive.");
// Set response type
// Set response type and add archive request reference as an extended attribute.
//TODO: store archiveRequestAddr in EOS
//response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr));
response.set_type(cta::xrd::Response::RSP_SUCCESS);
}
......@@ -592,7 +595,7 @@ void RequestMessage::processDELETE(const cta::eos::Notification &notification, c
cta::common::dataStructures::DeleteArchiveRequest request;
request.requester.name = notification.cli().user().username();
request.requester.group = notification.cli().user().groupname();
// CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
// must be converted to a valid uint64_t
......@@ -610,7 +613,7 @@ void RequestMessage::processDELETE(const cta::eos::Notification &notification, c
throw PbException("Invalid archiveFileID " + archiveFileIdStr);
}
// Delete the file from the catalogue
// Delete the file from the catalogue or from the objectstore if archive request is created
cta::utils::Timer t;
m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment