Commit a80ae270 authored by Cedric Caffy's avatar Cedric Caffy
Browse files

Added ArchiveRequest cancellation

parent b65f802d
......@@ -35,7 +35,7 @@ DeleteArchiveRequest::DeleteArchiveRequest():
//------------------------------------------------------------------------------
bool DeleteArchiveRequest::operator==(const DeleteArchiveRequest &rhs) const {
return requester==rhs.requester
&& archiveFileID==rhs.archiveFileID;
&& archiveFileID==rhs.archiveFileID && address==rhs.address;
}
//------------------------------------------------------------------------------
......@@ -50,7 +50,8 @@ bool DeleteArchiveRequest::operator!=(const DeleteArchiveRequest &rhs) const {
//------------------------------------------------------------------------------
std::ostream &operator<<(std::ostream &os, const DeleteArchiveRequest &obj) {
os << "(requester=" << obj.requester
<< " archiveFileID=" << obj.archiveFileID << ")";
<< " archiveFileID=" << obj.archiveFileID
<< " address=" << (obj.address ? obj.address.value() : "null") <<")";
return os;
}
......
......@@ -23,6 +23,7 @@
#include <stdint.h>
#include <string>
#include "common/optional.hpp"
#include "common/dataStructures/RequesterIdentity.hpp"
namespace cta {
......@@ -43,6 +44,7 @@ struct DeleteArchiveRequest {
RequesterIdentity requester;
uint64_t archiveFileID;
cta::optional<std::string> address;
}; // struct DeleteArchiveRequest
......
......@@ -51,7 +51,9 @@ void Sorter::executeArchiveAlgorithm(const std::string tapePool, std::string& qu
try{
std::rethrow_exception(failedAR.failure);
} catch (const cta::objectstore::Backend::NoSuchObject &ex) {
lc.log(log::WARNING,"In Sorter::executeArchiveAlgorithm(), queueing impossible, jobs do not exist in the objectstore.");
log::ScopedParamContainer params(lc);
params.add("fileId",failedAR.element->archiveFile.archiveFileID);
lc.log(log::WARNING,"In Sorter::executeArchiveAlgorithm(), queueing impossible, job do not exist in the objectstore.");
} catch(const cta::exception::Exception &e){
uint32_t copyNb = failedAR.element->copyNb;
std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception());
......@@ -168,10 +170,13 @@ void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueA
algo.referenceAndSwitchOwnershipIfNecessary(vid,previousOwner,queueAddress,jobsToAdd,lc);
} catch(typename Algo::OwnershipSwitchFailure &failure){
for(auto& failedRR: failure.failedElements){
try{
try {
std::rethrow_exception(failedRR.failure);
} catch (const cta::objectstore::Backend::NoSuchObject &ex) {
lc.log(log::WARNING,"In Sorter::executeRetrieveAlgorithm(), queueing impossible, jobs do not exist in the objectstore.");
log::ScopedParamContainer params(lc);
params.add("copyNb",failedRR.element->copyNb)
.add("fSeq",failedRR.element->fSeq);
lc.log(log::WARNING,"In Sorter::executeRetrieveAlgorithm(), queueing impossible, job do not exist in the objectstore.");
} catch (const cta::exception::Exception&){
uint32_t copyNb = failedRR.element->copyNb;
std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception());
......
......@@ -775,132 +775,6 @@ std::string OStoreDB::queueArchive(const std::string &instanceName, const cta::c
return archiveRequestAddr;
}
void OStoreDB::queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& logContext){
assertAgentAddressSet();
auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
auto * mutexForHelgrindAddr = mutexForHelgrind.release();
std::unique_ptr<cta::objectstore::ArchiveRequest> aReqUniqPtr(request.release());
objectstore::ScopedExclusiveLock rReqL(*aReqUniqPtr);
aReqUniqPtr->fetch();
uint64_t taskQueueSize = m_taskQueueSize;
// We create the object here
//m_agentReference->addToOwnership(aReqUniqPtr->getAddressIfSet(), m_objectStore);
utils::Timer timer;
double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// Prepare the logs to avoid multithread access on the object.
log::ScopedParamContainer params(logContext);
cta::common::dataStructures::ArchiveFile aFile = aReqUniqPtr->getArchiveFile();
params.add("jobObject", aReqUniqPtr->getAddressIfSet())
.add("fileId", aFile.archiveFileID)
.add("diskInstance", aFile.diskInstance)
.add("diskFilePath", aFile.diskFileInfo.path)
.add("diskFileId", aFile.diskFileId)
.add("agentReferencingTime", agentReferencingTime);
delayIfNecessary(logContext);
cta::objectstore::ArchiveRequest *aReqPtr = aReqUniqPtr.release();
m_taskQueueSize++;
auto * et = new EnqueueingTask([aReqPtr, mutexForHelgrindAddr, this]{
std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr);
std::unique_ptr<cta::objectstore::ArchiveRequest> aReq(aReqPtr);
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_taskQueueSize--;
m_taskPostingSemaphore.release();
};
// A bit ugly, but we need a non-null pointer for the "deleter" to be called at the end of the thread execution
std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
log::LogContext logContext(m_logger);
utils::Timer timer;
ScopedExclusiveLock arl(*aReq);
aReq->fetch();
timer.secs(cta::utils::Timer::reset_t::resetCounter);
double arTotalQueueUnlockTime = 0;
double arTotalQueueingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// We can now enqueue the requests
std::list<std::string> linkedTapePools;
std::string currentTapepool;
try {
for (auto &j: aReq->dumpJobs()) {
//Queue each job into the ArchiveQueue
double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
currentTapepool = j.tapePool;
linkedTapePools.push_back(j.owner);
auto shareLock = ostoredb::MemArchiveQueue::sharedAddToQueue(j, j.tapePool, *aReq, *this, logContext);
arTotalQueueingTime += qTime;
aReq->commit();
// Now we can let go off the queue.
shareLock.reset();
double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
arTotalQueueUnlockTime += qUnlockTime;
linkedTapePools.push_back(j.owner);
log::ScopedParamContainer params(logContext);
params.add("tapePool", j.tapePool)
.add("queueObject", j.owner)
.add("jobObject", aReq->getAddressIfSet())
.add("queueingTime", qTime)
.add("queueUnlockTime", qUnlockTime);
logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): added job to queue.");
}
} catch (NoSuchArchiveQueue &ex) {
// Unlink the request from already connected tape pools
for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) {
objectstore::ArchiveQueue aq(*tpa, m_objectStore);
ScopedExclusiveLock aql(aq);
aq.fetch();
aq.removeJobsAndCommit({aReq->getAddressIfSet()});
}
aReq->remove();
log::ScopedParamContainer params(logContext);
params.add("tapePool", currentTapepool)
.add("archiveRequestObject", aReq->getAddressIfSet())
.add("exceptionMessage", ex.getMessageValue())
.add("jobObject", aReq->getAddressIfSet());
logContext.log(log::ERR, "In OStoreDB::queueArchiveForRepack(): failed to enqueue job");
return;
}
// The request is now fully set.
double arOwnerResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
double arLockRelease = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// And remove reference from the agent
m_agentReference->removeFromOwnership(aReq->getAddressIfSet(), m_objectStore);
double agOwnershipResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
auto archiveFile = aReq->getArchiveFile();
log::ScopedParamContainer params(logContext);
params.add("jobObject", aReq->getAddressIfSet())
.add("fileId", archiveFile.archiveFileID)
.add("diskInstance", archiveFile.diskInstance)
.add("diskFilePath", archiveFile.diskFileInfo.path)
.add("diskFileId", archiveFile.diskFileId)
.add("totalQueueingTime", arTotalQueueingTime)
.add("totalQueueUnlockTime", arTotalQueueUnlockTime)
.add("ownerResetTime", arOwnerResetTime)
.add("lockReleaseTime", arLockRelease)
.add("agentOwnershipResetTime", agOwnershipResetTime)
.add("totalTime", arTotalQueueingTime
+ arTotalQueueUnlockTime + arOwnerResetTime + arLockRelease
+ agOwnershipResetTime);
logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): Finished enqueueing request.");
});
ANNOTATE_HAPPENS_BEFORE(et);
mlForHelgrind.unlock();
rReqL.release();
m_enqueueingTasksQueue.push(et);
//TODO Time measurement
double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
params.add("taskPostingTime", taskPostingTime)
.add("taskQueueSize", taskQueueSize)
.add("totalTime", agentReferencingTime + taskPostingTime);
logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): recorded request for queueing (enqueueing posted to thread pool).");
}
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
......@@ -964,7 +838,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArch
criteria.files = filesRequested;
AQTRAlgo::PoppedElementsBatch jobs;
for (auto & q: queueList) {
jobs = aqtrAlgo.popNextBatch(q.tapePool, criteria, logContext);
jobs = aqtrAlgo.popNextBatch(q.tapePool, criteria, logContext);
if (!jobs.elements.empty()) break;
}
for (auto & j: jobs.elements) {
......@@ -1047,13 +921,33 @@ void OStoreDB::setArchiveJobBatchReported(std::list<cta::SchedulerDatabase::Arch
if (completeJobsToDelete.size()) {
std::list<std::string> jobsToUnown;
// Launch deletion.
for (auto &j: completeJobsToDelete) {
j->asyncDeleteRequest();
auto completeJobsToDeleteItor = completeJobsToDelete.begin();
while(completeJobsToDeleteItor != completeJobsToDelete.end())
{
auto & j = *completeJobsToDeleteItor;
try {
j->asyncDeleteRequest();
} catch(const Backend::NoSuchObject & ex) {
log::ScopedParamContainer params(lc);
params.add("fileId",j->archiveFile.archiveFileID);
lc.log(log::WARNING,"In OStoreDB::setArchiveJobBatchReported() failed to asyncDeleteRequest because it does not exist in the objectstore.");
completeJobsToDelete.erase(completeJobsToDeleteItor);
}
completeJobsToDeleteItor++;
}
timingList.insertAndReset("deleteLaunchTime", t);
for (auto &j: completeJobsToDelete) {
try {
j->waitAsyncDelete();
try {
j->waitAsyncDelete();
} catch(const Backend::NoSuchObject & ex){
//No need to delete from the completeJobsToDelete list
//as it is not used later
log::ScopedParamContainer params(lc);
params.add("fileId",j->archiveFile.archiveFileID);
lc.log(log::WARNING,"In OStoreDB::setArchiveJobBatchReported() failed to asyncDeleteRequest because it does not exist in the objectstore.");
continue;
}
log::ScopedParamContainer params(lc);
params.add("fileId", j->archiveFile.archiveFileID)
.add("objectAddress", j->m_archiveRequest.getAddressIfSet());
......@@ -1086,9 +980,20 @@ void OStoreDB::setArchiveJobBatchReported(std::list<cta::SchedulerDatabase::Arch
}
try {
caAQF.referenceAndSwitchOwnership(queue.first, m_agentReference->getAgentAddress(), insertedElements, lc);
} catch (typename CaAQF::OwnershipSwitchFailure &failure) {
for(auto &failedAR: failure.failedElements){
try{
std::rethrow_exception(failedAR.failure);
} catch (const cta::objectstore::Backend::NoSuchObject &ex) {
log::ScopedParamContainer params(lc);
params.add("fileId",failedAR.element->archiveFile.archiveFileID);
lc.log(log::WARNING,"In OStoreDB::setArchiveJobBatchReported(), queueing impossible, job do not exist in the objectstore.");
}
}
} catch (exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("exceptionMSG",ex.getMessageValue());
lc.log(log::ERR,"In OStoreDB::setArchiveJobBatchReported(), failed to queue in the ArchiveQueueFailed.");
}
log::TimingList tl;
tl.insertAndReset("queueAndSwitchStateTime", t);
......@@ -1108,10 +1013,21 @@ void OStoreDB::setRetrieveJobBatchReportedToUser(std::list<cta::SchedulerDatabas
// Sort jobs to be updated
std::map<std::string, std::list<FailedJobToQueue>> failedQueues;
for(auto &j : jobsBatch) {
auto jobsBatchItor = jobsBatch.begin();
while(jobsBatchItor != jobsBatch.end()) {
auto & j = *jobsBatchItor;
switch(j->reportType) {
case SchedulerDatabase::RetrieveJob::ReportType::FailureReport: {
j->fail();
try {
j->fail();
} catch (const Backend::NoSuchObject &ex){
log::ScopedParamContainer params(lc);
params.add("fileId",j->archiveFile.archiveFileID)
.add("exceptionMsg",ex.getMessageValue());
lc.log(log::WARNING,"In OStoreDB::setRetrieveJobBatchReportedToUser(), fail to fail the job because it does not exists in the objectstore.");
jobsBatch.erase(jobsBatchItor);
goto next;
}
auto &vid = j->archiveFile.tapeFiles.at(j->selectedCopyNb).vid;
failedQueues[vid].push_back(FailedJobToQueue{ castFromSchedDBJob(j) });
break;
......@@ -1123,6 +1039,8 @@ void OStoreDB::setRetrieveJobBatchReportedToUser(std::list<cta::SchedulerDatabas
lc.log(log::ERR, "In OStoreDB::setRetrieveJobBatchReportedToUser(): unexpected job status. Leaving the job as-is.");
}
}
next:
jobsBatchItor++;
}
// Put the failed jobs in the failed queue
......@@ -1441,8 +1359,50 @@ void OStoreDB::deleteRetrieveRequest(const common::dataStructures::SecurityIdent
throw exception::Exception("Not Implemented");
}
void OStoreDB::deleteArchiveRequest(const common::dataStructures::SecurityIdentity& cliIdentity){
//------------------------------------------------------------------------------
// OStoreDB::deleteArchiveRequest()
//------------------------------------------------------------------------------
void OStoreDB::cancelArchive(const common::dataStructures::DeleteArchiveRequest & request, log::LogContext & lc){
if(!request.address){
log::ScopedParamContainer params(lc);
params.add("ArchiveFileID", request.archiveFileID);
lc.log(log::ERR, "In OStoreDB::cancelArchive(): no archive request address provided");
throw exception::Exception("In OStoreDB::cancelArchive(): no archive request address provided");
}
objectstore::ArchiveRequest ar(request.address.value(),m_objectStore);
objectstore::ScopedExclusiveLock sel;
try {
sel.lock(ar);
ar.fetch();
} catch (objectstore::Backend::NoSuchObject &) {
log::ScopedParamContainer params(lc);
params.add("archiveFileId", request.archiveFileID)
.add("archiveRequestId", request.address.value());
lc.log(log::WARNING, "In OStoreDB::cancelArchive(): no such archive request.");
return;
} catch (exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("archiveFileId", request.archiveFileID)
.add("archiveRequestId", request.address.value())
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::cancelArchive(): failed to lock of fetch the archive request.");
throw;
}
// We have the objectstore request. Let's validate it is matching the cancellation request's.
if (request.archiveFileID != ar.getArchiveFile().archiveFileID) {
log::ScopedParamContainer params(lc);
params.add("ArchiveFileID", request.archiveFileID)
.add("archiveRequest", request.address.value())
.add("ArchiveFileIdFromRequest", ar.getArchiveFile().archiveFileID);
lc.log(log::ERR, "In OStoreDB::cancelArchive(): archive file Id mismatch.");
throw exception::Exception("In OStoreDB::cancelArchive(): archiveFileID mismatch.");
}
// Looks fine, we delete the request
log::ScopedParamContainer params(lc);
params.add("ArchiveFileID", request.archiveFileID)
.add("archiveRequestId", request.address.value());
lc.log(log::INFO, "OStoreDB::cancelArchive(): will delete the archive request");
ar.remove();
}
//------------------------------------------------------------------------------
......@@ -4247,9 +4207,21 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct
log::TimingList timingList;
// We will asynchronously report the archive jobs (which MUST be OStoreDBJobs).
// We let the exceptions through as failing to report is fatal.
for (auto & sDBJob: jobsBatch) {
castFromSchedDBJob(sDBJob.get())->asyncSucceedTransfer();
auto jobsBatchItor = jobsBatch.begin();
while(jobsBatchItor != jobsBatch.end()){
try {
castFromSchedDBJob(jobsBatchItor->get())->asyncSucceedTransfer();
jobsBatchItor++;
} catch (cta::objectstore::Backend::NoSuchObject &ex) {
jobsBatch.erase(jobsBatchItor++);
log::ScopedParamContainer params(lc);
params.add("fileId", (*jobsBatchItor)->archiveFile.archiveFileID)
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::WARNING,
"In OStoreDB::RetrieveMount::setJobBatchTransferred(): async succeed transfer failed, job does not exist in the objectstore.");
}
}
timingList.insertAndReset("asyncSucceedLaunchTime", t);
// We will only know whether we need to queue the requests for user for reporting after updating request. So on a first
// pass we update the request and on the second, we will queue a batch of them to the report queue. Report queues
......@@ -4257,16 +4229,27 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct
// be without contention.
// Jobs that do not require queuing are done from our perspective and we should just remove them from agent ownership.
// Jobs for repack always get reported.
for (auto & sDBJob: jobsBatch) {
castFromSchedDBJob(sDBJob.get())->waitAsyncSucceed();
auto repackInfo = castFromSchedDBJob(sDBJob.get())->getRepackInfoAfterAsyncSuccess();
if (repackInfo.isRepack) {
jobsToQueueForReportingToRepack.insert(castFromSchedDBJob(sDBJob.get()));
} else {
if (castFromSchedDBJob(sDBJob.get())->isLastAfterAsyncSuccess())
jobsToQueueForReportingToUser.insert(castFromSchedDBJob(sDBJob.get()));
else
ajToUnown.push_back(castFromSchedDBJob(sDBJob.get())->m_archiveRequest.getAddressIfSet());
jobsBatchItor = jobsBatch.begin();
while(jobsBatchItor != jobsBatch.end()){
try {
castFromSchedDBJob(jobsBatchItor->get())->waitAsyncSucceed();
auto repackInfo = castFromSchedDBJob(jobsBatchItor->get())->getRepackInfoAfterAsyncSuccess();
if (repackInfo.isRepack) {
jobsToQueueForReportingToRepack.insert(castFromSchedDBJob(jobsBatchItor->get()));
} else {
if (castFromSchedDBJob(jobsBatchItor->get())->isLastAfterAsyncSuccess())
jobsToQueueForReportingToUser.insert(castFromSchedDBJob(jobsBatchItor->get()));
else
ajToUnown.push_back(castFromSchedDBJob(jobsBatchItor->get())->m_archiveRequest.getAddressIfSet());
}
jobsBatchItor++;
} catch (cta::objectstore::Backend::NoSuchObject &ex) {
jobsBatch.erase(jobsBatchItor++);
log::ScopedParamContainer params(lc);
params.add("fileId", (*jobsBatchItor)->archiveFile.archiveFileID)
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::WARNING,
"In OStoreDB::RetrieveMount::setJobBatchTransferred(): wait async succeed transfer failed, job does not exist in the objectstore.");
}
}
timingList.insertAndReset("asyncSucceedCompletionTime", t);
......
......@@ -286,8 +286,6 @@ public:
std::string queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) override;
void queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& logContext) override;
std::map<std::string, std::list<common::dataStructures::ArchiveJob>> getArchiveJobs() const override;
......@@ -329,7 +327,12 @@ public:
void deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& requester,
const std::string& remoteFile) override;
void deleteArchiveRequest(const common::dataStructures::SecurityIdentity & cliIdentity) override;
/**
* Idempotently deletes the specified ArchiveRequest from the objectstore
* @param address, the address of the ArchiveRequest
* @param archiveFileID the archiveFileID of the file to delete.
*/
virtual void cancelArchive(const common::dataStructures::DeleteArchiveRequest& request, log::LogContext & lc) override;
std::list<cta::common::dataStructures::RetrieveJob> getRetrieveJobs(const std::string &vid) const override;
......
......@@ -110,18 +110,10 @@ public:
return m_OStoreDB.queueArchive(instanceName, request, criteria, logContext);
}
void queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& lc) override {
return m_OStoreDB.queueArchiveForRepack(std::move(request),lc);
}
void deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& cliIdentity, const std::string& remoteFile) override {
m_OStoreDB.deleteRetrieveRequest(cliIdentity, remoteFile);
}
void deleteArchiveRequest(const common::dataStructures::SecurityIdentity& cliIdentity) override{
}
std::list<cta::common::dataStructures::RetrieveJob> getRetrieveJobs(const std::string& tapePoolName) const override {
return m_OStoreDB.getRetrieveJobs(tapePoolName);
}
......@@ -228,6 +220,10 @@ public:
return m_OStoreDB.queueRetrieve(rqst, criteria, diskSystemName, logContext);
}
void cancelArchive(const common::dataStructures::DeleteArchiveRequest& request, log::LogContext & lc) override {
m_OStoreDB.cancelArchive(request,lc);
}
void cancelRetrieve(const std::string& instanceName, const cta::common::dataStructures::CancelRetrieveRequest& rqst,
log::LogContext& lc) override {
m_OStoreDB.cancelRetrieve(instanceName, rqst, lc);
......
......@@ -249,19 +249,12 @@ void Scheduler::deleteArchive(const std::string &instanceName, const common::dat
// We have different possible scenarios here. The file can be safe in the catalogue,
// fully queued, or partially queued.
// First, make sure the file is not queued anymore.
// TEMPORARILY commenting out SchedulerDatabase::deleteArchiveRequest() in order
// to reduce latency. PLEASE NOTE however that this means files "in-flight" to
// tape will not be deleted and they will appear in the CTA catalogue when they
// are finally written to tape.
//try {
// m_db.deleteArchiveRequest(instanceName, request.archiveFileID);
//} catch (exception::Exception &dbEx) {
// // The file was apparently not queued. If we fail to remove it from the
// // catalogue for any reason other than it does not exist in the catalogue,
// // then it is an error.
// m_catalogue.deleteArchiveFile(instanceName, request.archiveFileID);
//}
utils::Timer t;
if(request.address) {
//Check if address is provided, we can remove the request from the objectstore
m_db.cancelArchive(request,lc);
}
m_catalogue.deleteArchiveFile(instanceName, request.archiveFileID, lc);
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
log::ScopedParamContainer spc(lc);
......@@ -1707,7 +1700,14 @@ void Scheduler::reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob> >&
.add("reportType", j->reportType())
.add("exceptionMSG", ex.getMessageValue());
lc.log(log::ERR, "In Scheduler::reportArchiveJobsBatch(): failed to launch reporter.");
j->reportFailed(ex.getMessageValue(), lc);
try {
j->reportFailed(ex.getMessageValue(), lc);
} catch(const cta::objectstore::Backend::NoSuchObject &ex){
params.add("fileId",j->archiveFile.archiveFileID)
.add("reportType",j->reportType())
.add("exceptionMSG",ex.getMessageValue());
lc.log(log::WARNING,"In Scheduler::reportArchiveJobsBatch(): failed to reportFailed the job because it does not exist in the objectstore.");
}
}
}
timingList.insertAndReset("asyncReportLaunchTime", t);
......@@ -1722,7 +1722,14 @@ void Scheduler::reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob> >&
.add("reportType", current.archiveJob->reportType())
.add("exceptionMSG", ex.getMessageValue());
lc.log(log::ERR, "In Scheduler::reportArchiveJobsBatch(): failed to report.");
current.archiveJob->reportFailed(ex.getMessageValue(), lc);
try {
current.archiveJob->reportFailed(ex.getMessageValue(), lc);
} catch(const cta::objectstore::Backend::NoSuchObject &ex){
params.add("fileId",current.archiveJob->archiveFile.archiveFileID)
.add("reportType",current.archiveJob->reportType())
.add("exceptionMSG",ex.getMessageValue());
lc.log(log::WARNING,"In Scheduler::reportArchiveJobsBatch(): failed to reportFailed the current job because it does not exist in the objectstore.");
}
}
}
timingList.insertAndReset("reportCompletionTime", t);
......
......@@ -34,6 +34,7 @@
#include "common/dataStructures/CancelRetrieveRequest.hpp"
#include "common/dataStructures/RepackInfo.hpp"
#include "common/dataStructures/SecurityIdentity.hpp"
#include "common/dataStructures/DeleteArchiveRequest.hpp"
#include "disk/DiskSystem.hpp"
#include "common/remoteFS/RemotePathAndStatus.hpp"
#include "common/exception/Exception.hpp"
......@@ -121,8 +122,7 @@ public:
virtual std::string queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria,
log::LogContext &logContext) = 0;
virtual void queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request,log::LogContext &logContext) = 0;
/**
* Returns all of the queued archive jobs. The returned jobs are
......@@ -328,7 +328,12 @@ public:
const common::dataStructures::SecurityIdentity &cliIdentity,
const std::string &remoteFile) = 0;
virtual void deleteArchiveRequest(const common::dataStructures::SecurityIdentity & cliIdentity) = 0;
/**
* Idempotently deletes the specified ArchiveRequest from the objectstore
* @param request, the ArchiveRequest to delete
* @param lc the LogContext
*/
virtual void cancelArchive(const common::dataStructures::DeleteArchiveRequest& request, log::LogContext & lc) = 0;
/**
* Returns all of the queued archive jobs. The returned jobs are
......
......@@ -288,13 +288,25 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
} else {
// Just log this was an empty mount and that's it. The memory management
// will be deallocated automatically.
lc.log(cta::log::ERR, "Aborting recall mount startup: empty mount");
cta::log::LogContext::ScopedParam sp1(lc, cta::log::Param("errorMessage", "Aborted: empty recall mount"));
lc.log(cta::log::WARNING, "Aborting recall mount startup: empty mount");
std::string mountId = retrieveMount->getMountTransactionId();
std::string mountType = cta::common::dataStructures::toString(retrieveMount->getMountType());
cta::log::Param errorMessageParam("errorMessage", "Aborted: empty recall mount");
cta::log::Param mountIdParam("mountId", mountId);
cta::log::Param mountTypeParam("mountType", mountType);
cta::log::LogContext::ScopedParam sp1(lc, errorMessageParam);
try {
retrieveMount->abort();
cta::log::LogContext::ScopedParam sp08(lc, cta::log::Param("MountTransactionId", retrieveMount->getMountTransactionId()));
rwd.updateStats(TapeSessionStats());
rwd.reportStats();
std::list<cta::log::Param> paramList { errorMessageParam, mountIdParam, mountTypeParam };
m_intialProcess.addLogParams(m_driveConfig.unitName,paramList);
cta::log::LogContext::ScopedParam sp08(lc, cta::log::Param("MountTransactionId", mountId));
cta::log::LogContext::ScopedParam sp11(lc, cta::log::Param("errorMessage", "Aborted: empty recall mount"));
lc.log(cta::log::ERR, "Notified client of end session with error");
lc.log(cta::log::WARNING, "Notified client of end session with error");
} catch(cta::exception::Exception & ex) {
cta::log::LogContext::ScopedParam sp1(lc, cta::log::Param("notificationError", ex.getMessageValue()));
lc.log(cta::log::ERR, "Failed to notified client of end session with error");
......@@ -403,12 +415,22 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction