Commit cc3f1b34 authored by Eric Cano's avatar Eric Cano
Browse files

Changed the interface of the request deletion sequence into the creation of an

object with callbacks, like for the archival request creation.
Implemented this interface in both Mock and ObjectStore implementations of the
SchedulerDatabase.
parent 7b26cfa0
......@@ -66,6 +66,22 @@ void cta::objectstore::ArchiveToFileRequest::setJobsLinkingToTapePool() {
}
}
void cta::objectstore::ArchiveToFileRequest::setJobsFailed() {
checkPayloadWritable();
auto * jl=m_payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
j->set_status(serializers::AJS_Failed);
}
}
void cta::objectstore::ArchiveToFileRequest::setJobsPendingNSdeletion() {
checkPayloadWritable();
auto * jl=m_payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
j->set_status(serializers::AJS_PendingNsDeletion);
}
}
void cta::objectstore::ArchiveToFileRequest::setArchiveFile(
const std::string& archiveFile) {
checkPayloadWritable();
......
......@@ -57,6 +57,10 @@ public:
bool finishIfNecessary();
// Mark all jobs as pending mount (following their linking to a tape pool)
void setJobsLinkingToTapePool();
// Mark all the jobs as being deleted, in case of a cancellation
void setJobsFailed();
// Mark all the jobs as pending deletion from NS.
void setJobsPendingNSdeletion();
// Request management ========================================================
void setSuccessful();
void setFailed();
......
......@@ -37,50 +37,14 @@ protected:
m_existingObject(false), m_locksCount(0),
m_locksForWriteCount(0) {}
public:
class AddressNotSet: public cta::exception::Exception {
public:
AddressNotSet(const std::string & w): cta::exception::Exception(w) {}
};
class NotLocked: public cta::exception::Exception {
public:
NotLocked(const std::string & w): cta::exception::Exception(w) {}
};
class AlreadyLocked: public cta::exception::Exception {
public:
AlreadyLocked(const std::string & w): cta::exception::Exception(w) {}
};
class WrongType: public cta::exception::Exception {
public:
WrongType(const std::string & w): cta::exception::Exception(w) {}
};
class NotNewObject: public cta::exception::Exception {
public:
NotNewObject(const std::string & w): cta::exception::Exception(w) {}
};
class NewObject: public cta::exception::Exception {
public:
NewObject(const std::string & w): cta::exception::Exception(w) {}
};
class NotFetched: public cta::exception::Exception {
public:
NotFetched(const std::string & w): cta::exception::Exception(w) {}
};
class NotInitialized: public cta::exception::Exception {
public:
NotInitialized(const std::string & w): cta::exception::Exception(w) {}
};
class AddressAlreadySet: public cta::exception::Exception {
public:
AddressAlreadySet(const std::string & w): cta::exception::Exception(w) {}
};
CTA_GENERATE_EXCEPTION_CLASS(AddressNotSet);
CTA_GENERATE_EXCEPTION_CLASS(NotLocked);
CTA_GENERATE_EXCEPTION_CLASS(WrongType);
CTA_GENERATE_EXCEPTION_CLASS(NotNewObject);
CTA_GENERATE_EXCEPTION_CLASS(NewObject);
CTA_GENERATE_EXCEPTION_CLASS(NotFetched);
CTA_GENERATE_EXCEPTION_CLASS(NotInitialized);
CTA_GENERATE_EXCEPTION_CLASS(AddressAlreadySet);
protected:
void checkHeaderWritable() {
if (!m_headerInterpreted)
......
......@@ -238,6 +238,7 @@ void cta::objectstore::TapePool::removeJob(const std::string& archiveToFileAddre
auto * jl=m_payload.mutable_pendingarchivejobs();
bool found = false;
do {
found = false;
// Push the found entry all the way to the end.
for (size_t i=0; i<(size_t)jl->size(); i++) {
if (jl->Get(i).address() == archiveToFileAddress) {
......
......@@ -544,7 +544,8 @@ void cta::MockSchedulerDatabase::deleteArchiveRequest(
//------------------------------------------------------------------------------
// markArchiveRequestForDeletion
//------------------------------------------------------------------------------
void cta::MockSchedulerDatabase::markArchiveRequestForDeletion(
std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCancelation>
cta::MockSchedulerDatabase::markArchiveRequestForDeletion(
const SecurityIdentity &requester,
const std::string &archiveFile) {
char *zErrMsg = 0;
......@@ -565,14 +566,12 @@ void cta::MockSchedulerDatabase::markArchiveRequestForDeletion(
" does not exist";
throw(exception::Exception(msg.str()));
}
return std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCancelation>(
new ArchiveToFileRequestCancelation(requester, archiveFile, *this));
}
//------------------------------------------------------------------------------
// fileEntryDeletedFromNS
//------------------------------------------------------------------------------
void cta::MockSchedulerDatabase::fileEntryDeletedFromNS(
const SecurityIdentity &requester, const std::string &archiveFile) {
deleteArchiveRequest(requester, archiveFile);
void cta::MockSchedulerDatabase::ArchiveToFileRequestCancelation::complete() {
m_parent.deleteArchiveRequest(m_requester, m_archiveFile);
}
//------------------------------------------------------------------------------
......
......@@ -109,6 +109,28 @@ public:
const SecurityIdentity &requester,
const std::string &archiveFile);
/*
* Subclass allowing the tracking and automated cleanup of a
* ArchiveToFile requests on the SchdulerDB for deletion.
* This will mark the request as to be deleted, and then add it to the agent's
* list. In a second step, the request will be completely deleted when calling
* the complete() method.
* In case of failure, the request will be queued to the orphaned requests queue,
* so that the scheduler picks it up later.
*/
class ArchiveToFileRequestCancelation: public SchedulerDatabase::ArchiveToFileRequestCancelation {
public:
ArchiveToFileRequestCancelation(const SecurityIdentity &requester,
const std::string &archiveFile, MockSchedulerDatabase & parent): m_requester(requester),
m_archiveFile(archiveFile), m_parent(parent) {}
virtual void complete();
virtual ~ArchiveToFileRequestCancelation() {};
private:
SecurityIdentity m_requester;
std::string m_archiveFile;
MockSchedulerDatabase & m_parent;
};
/**
* Marks the specified archive request for deletion. The request can only be
* fully deleted once the corresponding entry has been deleted from the
......@@ -118,19 +140,7 @@ public:
* @param archiveFile The absolute path of the destination file within the
* archive namespace.
*/
void markArchiveRequestForDeletion(
const SecurityIdentity &requester,
const std::string &archiveFile);
/**
* Notifies the scheduler database that the specified file entry has been
* deleted from the archive namespace.
*
* @param requester The identity of the requester.
* @param archiveFile The absolute path of the file within the archive
* namespace.
*/
void fileEntryDeletedFromNS(
std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation> markArchiveRequestForDeletion(
const SecurityIdentity &requester,
const std::string &archiveFile);
......
......@@ -314,7 +314,7 @@ void OStoreDB::ArchiveToFileRequestCreation::cancel() {
// do here is to delete the request from storage and dereference it from
// the agent's entry
if (m_closed) {
throw ArchveRequestAlreadyCompleteOrCanceled(
throw ArchiveRequestAlreadyCompleteOrCanceled(
"In OStoreDB::ArchiveToFileRequestCreation::cancel: trying the close "
"the request creation twice");
}
......@@ -350,12 +350,6 @@ OStoreDB::ArchiveToFileRequestCreation::~ArchiveToFileRequestCreation() {
} catch (...) {}
}
void OStoreDB::fileEntryDeletedFromNS(const SecurityIdentity& requester,
const std::string &archiveFile) {
throw exception::Exception(std::string(__FUNCTION__) + " not implemented");
}
void OStoreDB::createTapePool(const std::string& name,
const uint32_t nbPartialTapes, const cta::CreationLog &creationLog) {
RootEntry re(m_objectStore);
......@@ -542,7 +536,7 @@ void OStoreDB::deleteLogicalLibrary(const SecurityIdentity& requester,
std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation>
OStoreDB::queue(const cta::ArchiveToFileRequest& rqst) {
assertAgentSet();
// Construct the return value immediately
// Construct the return valucta::OStoreDBe immediately
std::unique_ptr<cta::OStoreDB::ArchiveToFileRequestCreation>
internalRet(new cta::OStoreDB::ArchiveToFileRequestCreation(m_agent, m_objectStore));
cta::objectstore::ArchiveToFileRequest & atfr = internalRet->m_request;
......@@ -601,14 +595,136 @@ void OStoreDB::queue(const ArchiveToDirRequest& rqst) {
void OStoreDB::deleteArchiveRequest(const SecurityIdentity& requester,
const std::string& archiveFile) {
throw exception::Exception("Not Implemented");
// First of, find the archive request form all the tape pools.
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
auto tpl = re.dumpTapePools();
rel.release();
for (auto tpp=tpl.begin(); tpp!= tpl.end(); tpp++) {
objectstore::TapePool tp(tpp->address, m_objectStore);
ScopedSharedLock tplock(tp);
tp.fetch();
auto ajl=tp.dumpJobs();
tplock.release();
for (auto ajp=ajl.begin(); ajp!=ajl.end(); ajp++) {
objectstore::ArchiveToFileRequest atfr(ajp->address, m_objectStore);
ScopedSharedLock atfrl(atfr);
atfr.fetch();
if (atfr.getArchiveFile() == archiveFile) {
atfrl.release();
objectstore::ScopedExclusiveLock al(*m_agent);
m_agent->fetch();
m_agent->addToOwnership(atfr.getAddressIfSet());
m_agent->commit();
ScopedExclusiveLock atfrxl(atfr);
atfr.fetch();
atfr.setJobsFailed();
atfr.setOwner(m_agent->getAddressIfSet());
atfr.commit();
auto jl = atfr.dumpJobs();
for (auto j=jl.begin(); j!=jl.end(); j++) {
try {
objectstore::TapePool tp(j->tapePoolAddress, m_objectStore);
ScopedExclusiveLock tpl(tp);
tp.fetch();
tp.removeJob(atfr.getAddressIfSet());
tp.commit();
} catch (...) {}
}
atfr.remove();
m_agent->removeFromOwnership(atfr.getAddressIfSet());
m_agent->commit();
}
}
}
throw NoSuchArchiveRequest("In OStoreDB::deleteArchiveRequest: ArchiveToFileRequest not found");
}
void OStoreDB::markArchiveRequestForDeletion(const SecurityIdentity& requester,
std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation>
OStoreDB::markArchiveRequestForDeletion(const SecurityIdentity& requester,
const std::string& archiveFile) {
throw exception::Exception("Not Implemented");
assertAgentSet();
// Construct the return value immediately
std::unique_ptr<cta::OStoreDB::ArchiveToFileRequestCancelation>
internalRet(new cta::OStoreDB::ArchiveToFileRequestCancelation(m_agent, m_objectStore));
cta::objectstore::ArchiveToFileRequest & atfr = internalRet->m_request;
cta::objectstore::ScopedExclusiveLock & atfrl = internalRet->m_lock;
// Attempt to find the request
objectstore::RootEntry re(m_objectStore);
ScopedSharedLock rel(re);
re.fetch();
auto tpl=re.dumpTapePools();
rel.release();
for (auto tpp=tpl.begin(); tpp!=tpl.end(); tpp++) {
try {
objectstore::TapePool tp(tpp->address, m_objectStore);
ScopedSharedLock tpl(tp);
tp.fetch();
auto arl = tp.dumpJobs();
tpl.release();
for (auto arp=arl.begin(); arp!=arl.end(); arp++) {
objectstore::ArchiveToFileRequest tatfr(arp->address, m_objectStore);
objectstore::ScopedSharedLock tatfrl(tatfr);
tatfr.fetch();
if (tatfr.getArchiveFile() == archiveFile) {
// Point the agent to the request
ScopedExclusiveLock agl(*m_agent);
m_agent->fetch();
m_agent->addToOwnership(arp->address);
m_agent->commit();
agl.release();
// Mark all jobs are being pending NS deletion (for being deleted them selves)
tatfrl.release();
atfr.setAddress(arp->address);
atfrl.lock(atfr);
atfr.fetch();
atfr.setJobsPendingNSdeletion();
atfr.commit();
// Unlink the jobs from the tape pools (it is safely referenced in the agent)
auto atpl=atfr.dumpJobs();
for (auto atpp=atpl.begin(); atpp!=atpl.end(); atpp++) {
objectstore::TapePool atp(atpp->tapePoolAddress, m_objectStore);
objectstore::ScopedExclusiveLock atpl(atp);
atp.fetch();
atp.removeJob(arp->address);
atp.commit();
}
// Return the object to the caller, so complete() can be called later.
std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCancelation> ret;
ret.reset(internalRet.release());
return ret;
}
}
} catch (...) {}
}
throw NoSuchArchiveRequest("In OStoreDB::markArchiveRequestForDeletion: ArchiveToFileRequest no found");
}
void OStoreDB::ArchiveToFileRequestCancelation::complete() {
if (m_closed)
throw ArchiveRequestAlreadyDeleted("OStoreDB::ArchiveToFileRequestCancelation::complete(): called twice");
// We just need to delete the object and forget it
m_request.remove();
objectstore::ScopedExclusiveLock al (*m_agent);
m_agent->fetch();
m_agent->removeFromOwnership(m_request.getAddressIfSet());
m_agent->commit();
m_closed = true;
}
OStoreDB::ArchiveToFileRequestCancelation::~ArchiveToFileRequestCancelation() {
if (!m_closed) {
m_request.garbageCollect(m_agent->getAddressIfSet());
objectstore::ScopedExclusiveLock al (*m_agent);
m_agent->fetch();
m_agent->removeFromOwnership(m_request.getAddressIfSet());
m_agent->commit();
}
}
std::map<cta::TapePool, std::list<ArchiveToTapeCopyRequest> >
OStoreDB::getArchiveRequests() const {
objectstore::RootEntry re(m_objectStore);
......
......@@ -127,7 +127,7 @@ public:
/* === Archival requests handling ======================================== */
CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestHasNoCopies);
CTA_GENERATE_EXCEPTION_CLASS(ArchveRequestAlreadyCompleteOrCanceled);
CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyCompleteOrCanceled);
class ArchiveToFileRequestCreation:
public cta::SchedulerDatabase::ArchiveToFileRequestCreation {
public:
......@@ -151,11 +151,26 @@ public:
virtual void queue(const ArchiveToDirRequest& rqst);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveRequest);
CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyDeleted);
virtual void deleteArchiveRequest(const SecurityIdentity& requester, const std::string& archiveFile);
virtual void markArchiveRequestForDeletion(const SecurityIdentity &requester, const std::string &archiveFile);
virtual void fileEntryDeletedFromNS(const SecurityIdentity &requester, const std::string &archiveFile);
class ArchiveToFileRequestCancelation:
public SchedulerDatabase::ArchiveToFileRequestCancelation {
public:
ArchiveToFileRequestCancelation(objectstore::Agent * agent,
objectstore::Backend & be): m_request(be), m_lock(), m_objectStore(be),
m_agent(agent), m_closed(false) {}
virtual ~ArchiveToFileRequestCancelation();
virtual void complete();
private:
objectstore::ArchiveToFileRequest m_request;
objectstore::ScopedExclusiveLock m_lock;
objectstore::Backend & m_objectStore;
objectstore::Agent * m_agent;
bool m_closed;
friend class OStoreDB;
};
virtual std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation> markArchiveRequestForDeletion(const SecurityIdentity &requester, const std::string &archiveFile);
virtual std::map<TapePool, std::list<ArchiveToTapeCopyRequest> > getArchiveRequests() const;
......
......@@ -122,8 +122,8 @@ public:
m_OStoreDB.deleteArchiveRequest(requester, archiveFile);
}
virtual void markArchiveRequestForDeletion(const SecurityIdentity &requester, const std::string &archiveFile) {
m_OStoreDB.markArchiveRequestForDeletion(requester, archiveFile);
virtual std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCancelation> markArchiveRequestForDeletion(const SecurityIdentity &requester, const std::string &archiveFile) {
return m_OStoreDB.markArchiveRequestForDeletion(requester, archiveFile);
}
virtual void deleteLogicalLibrary(const SecurityIdentity& requester, const std::string& name) {
......@@ -146,10 +146,6 @@ public:
m_OStoreDB.deleteTapePool(requester, name);
}
virtual void fileEntryDeletedFromNS(const SecurityIdentity& requester, const std::string &archiveFile) {
m_OStoreDB.fileEntryDeletedFromNS(requester, archiveFile);
}
virtual std::list<AdminHost> getAdminHosts() const {
return m_OStoreDB.getAdminHosts();
}
......
......@@ -99,9 +99,11 @@ std::list<cta::ArchiveToTapeCopyRequest> cta::Scheduler::getArchiveRequests(
void cta::Scheduler::deleteArchiveRequest(
const SecurityIdentity &requester,
const std::string &archiveFile) {
m_db.markArchiveRequestForDeletion(requester, archiveFile);
std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCancelation>
reqCancelation(
m_db.markArchiveRequestForDeletion(requester, archiveFile));
m_ns.deleteFile(requester, archiveFile);
m_db.fileEntryDeletedFromNS(requester, archiveFile);
reqCancelation->complete();
}
//------------------------------------------------------------------------------
......
......@@ -113,7 +113,21 @@ public:
virtual void deleteArchiveRequest(
const SecurityIdentity &requester,
const std::string &archiveFile) = 0;
/*
* Subclass allowing the tracking and automated cleanup of a
* ArchiveToFile requests on the SchdulerDB for deletion.
* This will mark the request as to be deleted, and then add it to the agent's
* list. In a second step, the request will be completely deleted when calling
* the complete() method.
* In case of failure, the request will be queued to the orphaned requests queue,
* so that the scheduler picks it up later.
*/
class ArchiveToFileRequestCancelation {
public:
virtual void complete() = 0;
virtual ~ArchiveToFileRequestCancelation() {};
};
/**
* Marks the specified archive request for deletion. The request can only be
* fully deleted once the corresponding entry has been deleted from the
......@@ -123,19 +137,7 @@ public:
* @param archiveFile The absolute path of the destination file within the
* archive namespace.
*/
virtual void markArchiveRequestForDeletion(
const SecurityIdentity &requester,
const std::string &archiveFile) = 0;
/**
* Notifies the scheduler database that the specified file entry has been
* deleted from the archive namespace.
*
* @param requester The identity of the requester.
* @param archiveFile The absolute path of the file within the archive
* namespace.
*/
virtual void fileEntryDeletedFromNS(
virtual std::unique_ptr<ArchiveToFileRequestCancelation> markArchiveRequestForDeletion(
const SecurityIdentity &requester,
const std::string &archiveFile) = 0;
......
......@@ -1925,7 +1925,7 @@ TEST_P(SchedulerTest, delete_archive_request) {
ASSERT_FALSE(archiveFiles.find("/grandparent/parent_file") == archiveFiles.end());
}
ASSERT_NO_THROW(scheduler.deleteArchiveRequest(s_userOnUserHost,
/*ASSERT_NO_THROW*/(scheduler.deleteArchiveRequest(s_userOnUserHost,
"/grandparent/parent_file"));
{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment