Commit 1eb1e6d9 authored by Eric Cano's avatar Eric Cano
Browse files

Stopped using archivequeueaddress field in archive jobs.

This field had a meaning when tapepools where stored in the objectstore and the archive queue a static object.
Since the introduction of the catalogue, they are now transient objects and storing their address
is not meaninful anymore.
parent 7223aca1
......@@ -56,8 +56,10 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
j->set_copynb(copyNumber);
j->set_status(serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue);
j->set_tapepool(tapepool);
j->set_owner("");
j->set_archivequeueaddress(archivequeueaddress);
j->set_owner(archivequeueaddress);
// XXX This field (archivequeueaddress) is a leftover from a past layout when tape pools were static
// in the object store, and should be eventually removed.
j->set_archivequeueaddress("");
j->set_totalretries(0);
j->set_retrieswithinmount(0);
j->set_lastmountwithfailure(0);
......@@ -65,18 +67,6 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
j->set_maxtotalretries(maxTotalRetries);
}
void ArchiveRequest::setJobArchiveQueueAddress(uint16_t copyNumber, const std::string& queueAddress) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() == copyNumber) {
j->set_archivequeueaddress(queueAddress);
return;
}
}
throw NoSuchJob("In ArchiveRequest::setJobArchiveQueueAddress(): job not found");
}
bool cta::objectstore::ArchiveRequest::setJobSuccessful(uint16_t copyNumber) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
......@@ -287,7 +277,7 @@ auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> {
ret.push_back(JobDump());
ret.back().copyNb = j->copynb();
ret.back().tapePool = j->tapepool();
ret.back().ArchiveQueueAddress = j->archivequeueaddress();
ret.back().owner = j->owner();
}
return ret;
}
......@@ -311,13 +301,13 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner) {
// If we fail to reconnect, we have to fail the job and potentially
// finish the request.
try {
ArchiveQueue aq(j->archivequeueaddress(), m_objectStore);
ArchiveQueue aq(j->owner(), m_objectStore);
ScopedExclusiveLock tpl(aq);
aq.fetch();
ArchiveRequest::JobDump jd;
jd.copyNb = j->copynb();
jd.tapePool = j->tapepool();
jd.ArchiveQueueAddress = j->archivequeueaddress();
jd.owner = j->owner();
if (aq.addJobIfNecessary(jd, getAddressIfSet(), getArchiveFile().archiveFileID,
getArchiveFile().fileSize, getMountPolicy(), getEntryLog().time))
aq.commit();
......@@ -335,13 +325,13 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner) {
// queue for files orphaned pending ns creation. Some user process will have
// to pick them up actively (recovery involves schedulerDB + NameServerDB)
try {
ArchiveQueue aq(j->archivequeueaddress(), m_objectStore);
ArchiveQueue aq(j->owner(), m_objectStore);
ScopedExclusiveLock tpl(aq);
aq.fetch();
ArchiveRequest::JobDump jd;
jd.copyNb = j->copynb();
jd.tapePool = j->tapepool();
jd.ArchiveQueueAddress = j->archivequeueaddress();
jd.owner = j->owner();
if (aq.addOrphanedJobPendingNsCreation(jd, getAddressIfSet(),
m_payload.archivefileid(), m_payload.filesize()))
aq.commit();
......@@ -357,13 +347,13 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner) {
// queue for files orphaned pending ns deletion. Some user process will have
// to pick them up actively (recovery involves schedulerDB + NameServerDB)
try {
ArchiveQueue aq(j->archivequeueaddress(), m_objectStore);
ArchiveQueue aq(j->owner(), m_objectStore);
ScopedExclusiveLock tpl(aq);
aq.fetch();
ArchiveRequest::JobDump jd;
jd.copyNb = j->copynb();
jd.tapePool = j->tapepool();
jd.ArchiveQueueAddress = j->archivequeueaddress();
jd.owner = j->owner();
if (aq.addOrphanedJobPendingNsCreation(jd, getAddressIfSet(),
m_payload.archivefileid(), m_payload.filesize()))
aq.commit();
......
......@@ -44,7 +44,6 @@ public:
// Job management ============================================================
void addJob(uint16_t copyNumber, const std::string & tapepool,
const std::string & archivequeueaddress, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries);
void setJobArchiveQueueAddress(uint16_t copyNumber, const std::string & queueAddress);
void setJobSelected(uint16_t copyNumber, const std::string & owner);
void setJobPending(uint16_t copyNumber);
bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job
......@@ -109,7 +108,7 @@ public:
public:
uint16_t copyNb;
std::string tapePool;
std::string ArchiveQueueAddress;
std::string owner;
};
std::list<JobDump> dumpJobs();
......
......@@ -365,7 +365,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
cta::objectstore::ArchiveRequest::JobDump jd;
jd.copyNb = 1;
jd.tapePool = "TapePool0";
jd.ArchiveQueueAddress = tpAddr[0];
jd.owner = tpAddr[0];
cta::common::dataStructures::MountPolicy policy;
policy.archiveMinRequestAge = 0;
policy.archivePriority = 1;
......@@ -383,7 +383,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
cta::objectstore::ArchiveRequest::JobDump jd;
jd.copyNb = 2;
jd.tapePool = "TapePool1";
jd.ArchiveQueueAddress = tpAddr[1];
jd.owner = tpAddr[1];
cta::common::dataStructures::MountPolicy policy;
policy.archiveMinRequestAge = 0;
policy.archivePriority = 1;
......
......@@ -114,9 +114,8 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNew
aq.addJob(job, archiveRequest.getAddressIfSet(), af.archiveFileID,
af.fileSize, archiveRequest.getMountPolicy(), archiveRequest.getEntryLog().time);
// Back reference the queue in the job and archive request
job.ArchiveQueueAddress = aq.getAddressIfSet();
archiveRequest.setJobArchiveQueueAddress(job.copyNb, job.ArchiveQueueAddress);
archiveRequest.setJobOwner(job.copyNb, job.ArchiveQueueAddress);
job.owner = aq.getAddressIfSet();
archiveRequest.setJobOwner(job.copyNb, job.owner);
}
// We do the same for all the queued requests
for (auto &maqr: maq->m_requests) {
......@@ -126,8 +125,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNew
af.fileSize, maqr->m_archiveRequest.getMountPolicy(),
maqr->m_archiveRequest.getEntryLog().time);
// Back reference the queue in the job and archive request
maqr->m_job.ArchiveQueueAddress = aq.getAddressIfSet();
maqr->m_archiveRequest.setJobArchiveQueueAddress(maqr->m_job.copyNb, aq.getAddressIfSet());
maqr->m_job.owner = aq.getAddressIfSet();
maqr->m_archiveRequest.setJobOwner(maqr->m_job.copyNb, aq.getAddressIfSet());
addedJobs++;
}
......
......@@ -351,10 +351,10 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
aReq.commit();
// Now we can let go off the queue.
shareLock.reset();
linkedTapePools.push_back(j.ArchiveQueueAddress);
linkedTapePools.push_back(j.owner);
log::ScopedParamContainer params(logContext);
params.add("tapepool", j.tapePool)
.add("queueObject", j.ArchiveQueueAddress)
.add("queueObject", j.owner)
.add("jobObject", aReq.getAddressIfSet());
logContext.log(log::INFO, "In OStoreDB::queueArchive(): added job to queue");
}
......@@ -388,7 +388,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
//------------------------------------------------------------------------------
void OStoreDB::deleteArchiveRequest(const std::string &diskInstanceName,
uint64_t fileId) {
// First of, find the archive request form all the tape pools.
// First of, find the archive request from all the tape pools.
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
......@@ -418,7 +418,7 @@ void OStoreDB::deleteArchiveRequest(const std::string &diskInstanceName,
// The owner might not be a queue, in which case the fetch will fail (and it's fine)
try {
// The queue on which we found the job is not locked anymore, so we can re-lock it.
ArchiveQueue aq2(j.ArchiveQueueAddress, m_objectStore);
ArchiveQueue aq2(j.owner, m_objectStore);
ScopedExclusiveLock aq2xl(aq2);
aq2.fetch();
aq2.removeJob(ar.getAddressIfSet());
......@@ -478,7 +478,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation>
// Unlink the jobs from the tape pools (it is safely referenced in the agent)
auto arJobs=ar.dumpJobs();
for (auto atpp=arJobs.begin(); atpp!=arJobs.end(); atpp++) {
objectstore::ArchiveQueue aqp(atpp->ArchiveQueueAddress, m_objectStore);
objectstore::ArchiveQueue aqp(atpp->owner, m_objectStore);
objectstore::ScopedExclusiveLock atpl(aqp);
aqp.fetch();
aqp.removeJob(arp->address);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment