diff --git a/common/log/LogContextTest.cpp b/common/log/LogContextTest.cpp index b939f0351e82ce8c400a064a891634af549139d6..b70f4866823f8170b46b9eb79d0607052339fd2b 100644 --- a/common/log/LogContextTest.cpp +++ b/common/log/LogContextTest.cpp @@ -37,12 +37,12 @@ namespace unitTests { ASSERT_EQ(1U, lc.size()); { // Create an anonymous variable (for its scope) - LogContext::ScopedParam sp(lc, Param("NSFILEID", 12345)); + LogContext::ScopedParam sp(lc, Param("archiveFileID", 12345)); ASSERT_EQ(2U, lc.size()); lc.log(DEBUG, "Two params message"); { // Test that we do not allow duplicate params - LogContext::ScopedParam sp(lc, Param("NSFILEID", 123456)); + LogContext::ScopedParam sp(lc, Param("archiveFileID", 123456)); ASSERT_EQ(2U, lc.size()); LogContext::ScopedParam sp2(lc, Param("TPVID", "T1234")); ASSERT_EQ(3U, lc.size()); @@ -62,16 +62,16 @@ namespace unitTests { std::string first = sl.getLog(); ASSERT_NE(std::string::npos, first.find("MigrationRequestId")); { - LogContext::ScopedParam sp(lc, Param("NSFILEID", 12345)); + LogContext::ScopedParam sp(lc, Param("archiveFileID", 12345)); lc.log(INFO, "Second log"); } std::string second = sl.getLog(); - ASSERT_NE(std::string::npos, second.find("NSFILEID")); + ASSERT_NE(std::string::npos, second.find("archiveFileID")); // We expect the NSFILEID parameter to show up only once (i.e, not after // offset, which marks the end of its first occurrence). lc.log(INFO, "Third log"); std::string third = sl.getLog(); - size_t offset = third.find("NSFILEID") + strlen("NSFILEID"); - ASSERT_EQ(std::string::npos, third.find("NSFILEID", offset)); + size_t offset = third.find("archiveFileID") + strlen("archiveFileID"); + ASSERT_EQ(std::string::npos, third.find("archiveFileID", offset)); } } diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index 90ae7d6af47d0ae8a508d5235c57d4d0cbdc69d3..40a7903d4679712184a5e4444f0da92c4ce489ac 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -108,18 +108,22 @@ public: } void setOwner(const std::string & owner) { + checkHeaderWritable(); m_header.set_owner(owner); } std::string getOwner() { + checkHeaderReadable(); return m_header.owner(); } void setBackupOwner(const std::string & owner) { + checkHeaderWritable(); m_header.set_backupowner(owner); } std::string getBackupOwner() { + checkHeaderReadable(); return m_header.backupowner(); } diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 30bfe791a987e36325dc006fe54ff073840279cf..35ed8613263cce865130a91ab4567fa9e40e97e3 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -87,12 +87,17 @@ std::string cta::objectstore::RetrieveQueue::dump() { return ret.str(); } -void cta::objectstore::RetrieveQueue::addJob(const RetrieveRequest::JobDump& job, +void cta::objectstore::RetrieveQueue::addJob(uint64_t copyNb, const std::string & retrieveRequestAddress, uint64_t size, const cta::common::dataStructures::MountPolicy & policy, time_t startTime) { checkPayloadWritable(); // Keep track of the mounting criteria ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.incCount(policy.maxDrivesAllowed); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.incCount(policy.retrievePriority); + ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + minRetrieveRequestAgeMap.incCount(policy.retrieveMinRequestAge); if (m_payload.retrievejobs_size()) { if (m_payload.oldestjobcreationtime() > (uint64_t)startTime) { m_payload.set_oldestjobcreationtime(startTime); @@ -105,7 +110,7 @@ void cta::objectstore::RetrieveQueue::addJob(const RetrieveRequest::JobDump& job auto * j = m_payload.add_retrievejobs(); j->set_address(retrieveRequestAddress); j->set_size(size); - j->set_copynb(job.tapeFile.copyNb); + j->set_copynb(copyNb); } cta::objectstore::RetrieveQueue::JobsSummary cta::objectstore::RetrieveQueue::getJobsSummary() { @@ -114,6 +119,18 @@ cta::objectstore::RetrieveQueue::JobsSummary cta::objectstore::RetrieveQueue::ge ret.bytes = m_payload.retrievejobstotalsize(); ret.files = m_payload.retrievejobs_size(); ret.oldestJobStartTime = m_payload.oldestjobcreationtime(); + if (ret.files) { + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + ret.maxDrivesAllowed = maxDriveAllowedMap.maxValue(); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + ret.priority = priorityMap.maxValue(); + ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + ret.minArchiveRequestAge = minArchiveRequestAgeMap.minValue(); + } else { + ret.maxDrivesAllowed = 0; + ret.priority = 0; + ret.minArchiveRequestAge = 0; + } return ret; } diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index 2987518cf996f0456d7ca22aaea8a69497764c24..4c88d888dc98040f491e2f18728225a4c740b7f9 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -41,7 +41,7 @@ public: std::string dump(); // Retrieve jobs management ================================================== - void addJob(const RetrieveRequest::JobDump & job, + void addJob(uint64_t copyNb, const std::string & retrieveRequestAddress, uint64_t size, const cta::common::dataStructures::MountPolicy & policy, time_t startTime); struct JobsSummary { @@ -49,6 +49,8 @@ public: uint64_t bytes; time_t oldestJobStartTime; uint64_t priority; + uint64_t minArchiveRequestAge; + uint64_t maxDrivesAllowed; }; JobsSummary getJobsSummary(); std::list<RetrieveRequestDump> dumpAndFetchRetrieveRequests(); diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index cfa9dcf2d6ac2cc9d9f19337b48e26e5f82e4813..5560aafa43bee9a3a72fb00fc84e8f1a92b4e5b8 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -21,6 +21,7 @@ #include "EntryLogSerDeser.hpp" #include "MountPolicySerDeser.hpp" #include "DiskFileInfoSerDeser.hpp" +#include "ArchiveFileSerDeser.hpp" #include "objectstore/cta.pb.h" #include <json-c/json.h> @@ -45,10 +46,10 @@ void RetrieveRequest::initialize() { m_payloadInterpreted = true; } -void RetrieveRequest::addJob(const cta::common::dataStructures::TapeFile & tapeFile, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) { +void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) { checkPayloadWritable(); auto *tf = m_payload.add_jobs(); - TapeFileSerDeser(tapeFile).serialize(*tf->mutable_tapefile()); + tf->set_copynb(copyNb); tf->set_maxretrieswithinmount(maxRetiesWithinMount); tf->set_maxtotalretries(maxTotalRetries); tf->set_retrieswithinmount(0); @@ -60,7 +61,7 @@ bool RetrieveRequest::setJobSuccessful(uint16_t copyNumber) { checkPayloadWritable(); auto * jl = m_payload.mutable_jobs(); for (auto j=jl->begin(); j!=jl->end(); j++) { - if (j->tapefile().copynb() == copyNumber) { + if (j->copynb() == copyNumber) { j->set_status(serializers::RetrieveJobStatus::RJS_Complete); for (auto j2=jl->begin(); j2!=jl->end(); j2++) { if (j2->status()!= serializers::RetrieveJobStatus::RJS_Complete && @@ -113,17 +114,29 @@ cta::common::dataStructures::RetrieveRequest RetrieveRequest::getSchedulerReques return ret; } +//------------------------------------------------------------------------------ +// getArchiveFile +//------------------------------------------------------------------------------ + +cta::common::dataStructures::ArchiveFile RetrieveRequest::getArchiveFile() { + objectstore::ArchiveFileSerDeser af; + af.deserialize(m_payload.archivefile()); + return af; +} + + //------------------------------------------------------------------------------ // setRetrieveFileQueueCriteria //------------------------------------------------------------------------------ void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria) { checkPayloadWritable(); + ArchiveFileSerDeser(criteria.archiveFile).serialize(*m_payload.mutable_archivefile()); for (auto &tf: criteria.archiveFile.tapeFiles) { MountPolicySerDeser(criteria.mountPolicy).serialize(*m_payload.mutable_mountpolicy()); const uint32_t hardcodedRetriesWithinMount = 3; const uint32_t hardcodedTotalRetries = 6; - addJob(tf.second, hardcodedRetriesWithinMount, hardcodedTotalRetries); + addJob(tf.second.copyNb, hardcodedRetriesWithinMount, hardcodedTotalRetries); } } @@ -135,9 +148,7 @@ auto RetrieveRequest::dumpJobs() -> std::list<JobDump> { std::list<JobDump> ret; for (auto & j: m_payload.jobs()) { ret.push_back(JobDump()); - TapeFileSerDeser tf; - tf.deserialize(j.tapefile()); - ret.back().tapeFile=tf; + ret.back().copyNb=j.copynb(); ret.back().maxRetriesWithinMount=j.maxretrieswithinmount(); ret.back().maxTotalRetries=j.maxtotalretries(); ret.back().retriesWithinMount=j.retrieswithinmount(); @@ -154,15 +165,14 @@ auto RetrieveRequest::getJob(uint16_t copyNb) -> JobDump { checkPayloadReadable(); // find the job for (auto & j: m_payload.jobs()) { - if (j.tapefile().copynb()==copyNb) { + if (j.copynb()==copyNb) { JobDump ret; - TapeFileSerDeser tf; - tf.deserialize(j.tapefile()); - ret.tapeFile=tf; + ret.copyNb=copyNb; ret.maxRetriesWithinMount=j.maxretrieswithinmount(); ret.maxTotalRetries=j.maxtotalretries(); ret.retriesWithinMount=j.retrieswithinmount(); ret.totalRetries=j.totalretries(); + return ret; } } throw NoSuchJob("In objectstore::RetrieveRequest::getJob(): job not found for this copyNb"); @@ -173,9 +183,7 @@ auto RetrieveRequest::getJobs() -> std::list<JobDump> { std::list<JobDump> ret; for (auto & j: m_payload.jobs()) { ret.push_back(JobDump()); - TapeFileSerDeser tf; - tf.deserialize(j.tapefile()); - ret.back().tapeFile=tf; + ret.back().copyNb=j.copynb(); ret.back().maxRetriesWithinMount=j.maxretrieswithinmount(); ret.back().maxTotalRetries=j.maxtotalretries(); ret.back().retriesWithinMount=j.retrieswithinmount(); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 56c4986dfbe0b4737bafa181dc3490e8b4298153..1ee8735eaa141f6855686224c3b74f2504148030 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -43,15 +43,13 @@ public: RetrieveRequest(GenericObject & go); void initialize(); // Job management ============================================================ - void addJob(const cta::common::dataStructures::TapeFile & tapeFile, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); - void setJobFailureLimits(uint16_t copyNumber, - uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); + void addJob(uint64_t copyNumber, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); void setJobSelected(uint16_t copyNumber, const std::string & owner); void setJobPending(uint16_t copyNumber); bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job class JobDump { public: - common::dataStructures::TapeFile tapeFile; + uint64_t copyNb; uint32_t maxTotalRetries; uint32_t maxRetriesWithinMount; uint32_t retriesWithinMount; @@ -83,6 +81,9 @@ public: void setSchedulerRequest(const cta::common::dataStructures::RetrieveRequest & retrieveRequest); cta::common::dataStructures::RetrieveRequest getSchedulerRequest(); + void setArchiveFile(const cta::common::dataStructures::ArchiveFile & archiveFile); + cta::common::dataStructures::ArchiveFile getArchiveFile(); + void setRetrieveFileQueueCriteria(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria); cta::common::dataStructures::RetrieveFileQueueCriteria getRetrieveFileQueueCriteria(); diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 4b8a82eb819368b09136e9c3484142e007f15603..0c75a35fed111dac06f4ff95be5fe79fdf4b0f9b 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -132,11 +132,11 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool) { checkPayloadWritable(); - // find the address of the tape pool object + // find the address of the archive queue object try { - auto tpp = serializers::findElement(m_payload.archivequeuepointers(), tapePool); + auto aqp = serializers::findElement(m_payload.archivequeuepointers(), tapePool); // Open the tape pool object - ArchiveQueue aq (tpp.address(), ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); + ArchiveQueue aq (aqp.address(), m_objectStore); ScopedExclusiveLock aql; try { aql.lock(aq); @@ -156,16 +156,16 @@ void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool) { // Verify this is the archive queue we're looking for. if (aq.getTapePool() != tapePool) { std::stringstream err; - err << "Unexpected tape pool name found in archive queue pointed to for tape pool: " + err << "In RootEntry::removeArchiveQueueAndCommit(): Unexpected tape pool name found in archive queue pointed to for tape pool: " << tapePool << " found: " << aq.getTapePool(); throw WrongArchiveQueue(err.str()); } // Check the archive queue is empty if (!aq.isEmpty()) { - throw ArchivelQueueNotEmpty ("In RootEntry::removeTapePoolQueueAndCommit: trying to " + throw ArchivelQueueNotEmpty ("In RootEntry::removeArchiveQueueAndCommit(): trying to " "remove a non-empty tape pool"); } - // We can delete the pool + // We can delete the queue aq.remove(); deleteFromRootEntry: // ... and remove it from our entry @@ -174,7 +174,7 @@ void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool) { commit(); } catch (serializers::NotFound &) { // No such tape pool. Nothing to to. - throw NoSuchArchiveQueue("In RootEntry::removeTapePoolQueueAndCommit: trying to remove non-existing tape pool"); + throw NoSuchArchiveQueue("In RootEntry::removeArchiveQueueAndCommit(): trying to remove non-existing archive queue"); } } @@ -250,6 +250,55 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag return retrieveQueueAddress; } +void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid) { + checkPayloadWritable(); + // find the address of the retrieve queue object + try { + auto rqp=serializers::findElement(m_payload.retrievequeuepointers(), vid); + // Open the retrieve queue object + RetrieveQueue rq(rqp.address(), m_objectStore); + ScopedExclusiveLock rql; + try { + rql.lock(rq); + rq.fetch(); + } catch (cta::exception::Exception & ex) { + // The archive queue seems to not be there. Make sure this is the case: + if (rq.exists()) { + // We failed to access the queue, yet it is present. This is an error. + // Let the exception pass through. + throw; + } else { + // The queue object is already gone. We can skip to removing the + // reference from the RootEntry + goto deleteFromRootEntry; + } + } + // Verify this is the retrieve queue we're looking for. + if (rq.getVid() != vid) { + std::stringstream err; + err << "Unexpected vid found in retrieve queue pointed to for vid: " + << vid << " found: " << rq.getVid(); + throw WrongArchiveQueue(err.str()); + } + // Check the retrieve queue is empty + if (!rq.isEmpty()) { + throw RetrieveQueueNotEmpty("In RootEntry::removeTapePoolQueueAndCommit: trying to " + "remove a non-empty tape pool"); + } + // We can now delete the queue + rq.remove(); + deleteFromRootEntry: + // ... and remove it from our entry + serializers::removeOccurences(m_payload.mutable_retrievequeuepointers(), vid); + // We commit for safety and symmetry with the add operation + commit(); + } catch (serializers::NotFound &) { + // No such tape pool. Nothing to to. + throw NoSuchRetrieveQueue("In RootEntry::addOrGetRetrieveQueueAndCommit: trying to remove non-existing retrieve queue"); + } +} + + std::string RootEntry::getRetrieveQueue(const std::string& vid) { throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); } diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 49c0296efe16aa2d1a6d89101ed6edeac0e5f12f..6dfb1f15929abe2da335462a1cf090618c38e745 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -69,10 +69,12 @@ public: std::list<ArchiveQueueDump> dumpArchiveQueues(); // RetrieveQueue handling ==================================================== + CTA_GENERATE_EXCEPTION_CLASS(RetrieveQueueNotEmpty); /** This function implicitly creates the retrieve queue structure and updates * the pointer to it. It will implicitly commit the object to the store. */ std::string addOrGetRetrieveQueueAndCommit(const std::string & vid, Agent & agent); CTA_GENERATE_EXCEPTION_CLASS(NoSuchRetrieveQueue); + void removeRetrieveQueueAndCommit(const std::string & vid); std::string getRetrieveQueue(const std::string & vid); struct RetrieveQueueDump { std::string vid; diff --git a/objectstore/ValueCountMap.cpp b/objectstore/ValueCountMap.cpp index 02aa7ed29f9fe25706560d25c6f117687c5dd59e..8aa0107c59a76fe641c98d4a92ec97434244c1a4 100644 --- a/objectstore/ValueCountMap.cpp +++ b/objectstore/ValueCountMap.cpp @@ -88,7 +88,8 @@ void ValueCountMap::incCount(uint64_t value) { } uint64_t ValueCountMap::maxValue() { - if (!m_valueCountMap.size()) throw cta::exception::Exception("In ValueCountMap::maxValue: empty map"); + if (!m_valueCountMap.size()) + throw cta::exception::Exception("In ValueCountMap::maxValue: empty map"); uint64_t ret = std::numeric_limits<uint64_t>::min(); std::for_each(m_valueCountMap.begin(), m_valueCountMap.end(), [&](decltype(*m_valueCountMap.begin()) pair) { diff --git a/objectstore/cta.proto b/objectstore/cta.proto index c3405bb09edd036270d0b51c79e200a5b572d258..2395a7da7aaa912be263b0af46564b92850e78ef 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -180,13 +180,36 @@ message MountCriteria { required uint32 quota = 3203; } +message TapeFile { + required string vid = 9120; + required uint64 fseq = 9121; + required uint64 blockid = 9122; + required uint64 compressedsize = 9123; + required uint64 copynb = 9124; + required uint64 creationtime = 9125; + required string checksumtype = 9126; + required string checksumvalue = 9127; +} + +message DiskFileInfo { + required string recoveryBlob = 8900; + required string group = 8910; + required string owner = 8930; + required string path = 8940; +} + message ArchiveFile { - required uint64 fileId = 4351; - required uint64 size = 4352; + required uint64 archivefileid = 4351; + required uint64 filesize = 4352; required string diskfileid = 4353; - required string checksumtype = 4354; - required string checksumvalue = 4355; - required uint64 creationtime = 4356; + required string diskinstance= 4354; + required DiskFileInfo diskfileinfo= 4355; + required string checksumtype = 4356; + required string checksumvalue = 4357; + required uint64 creationtime = 4358; + repeated TapeFile tapefiles = 4359; + required uint64 reconciliationtime = 4360; + required string storageclass = 4361; } // ------------- Archive Jobs -------------------------------------------------- @@ -307,13 +330,6 @@ message User { required string group = 8810; } -message DiskFileInfo { - required string recoveryBlob = 8900; - required string group = 8910; - required string owner = 8930; - required string path = 8940; -} - message EntryLog { required string username = 8950; required string host = 8960; @@ -362,19 +378,8 @@ message SchedulerRetrieveRequest { required EntryLog entrylog = 9106; } -message TapeFile { - required string vid = 9120; - required uint64 fseq = 9121; - required uint64 blockid = 9122; - required uint64 compressedsize = 9123; - required uint64 copynb = 9124; - required uint64 creationtime = 9125; - required string checksumtype = 9126; - required string checksumvalue = 9127; -} - message RetrieveJob { - required TapeFile tapefile = 9200; + required uint64 copynb = 9200; required uint32 maxtotalretries = 9201; required uint32 maxretrieswithinmount = 9202; required uint32 retrieswithinmount = 9203; @@ -385,8 +390,9 @@ message RetrieveJob { message RetrieveRequest { required SchedulerRetrieveRequest schedulerrequest = 9150; required MountPolicy mountpolicy = 9151; - required uint32 activecopynb = 9152; - repeated RetrieveJob jobs = 9153; + required ArchiveFile archivefile = 9152; + required uint32 activecopynb = 9153; + repeated RetrieveJob jobs = 9154; } message ValueCountPair { @@ -410,7 +416,7 @@ message RetrieveQueue { required string vid = 10100; repeated RetrieveJobPointer retrievejobs = 10110; repeated ValueCountPair prioritymap = 10131; - repeated ValueCountPair minretrievequestagemap = 10132; + repeated ValueCountPair minretrieverequestagemap = 10132; repeated ValueCountPair maxdrivesallowedmap = 10133; required uint64 retrievejobstotalsize = 10140; required uint64 oldestjobcreationtime = 10150; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index fc61952135821d9a3156707193841e52eb697b62..09b50ddd9a441c2258b9228f3c4b538ded5133c1 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -78,20 +78,19 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> tmdi.m_lockOnSchedulerGlobalLock.lock(*tmdi.m_schedulerGlobalLock); tmdi.m_lockTaken = true; tmdi.m_schedulerGlobalLock->fetch(); - auto tplist = re.dumpArchiveQueues(); - for (auto tpp=tplist.begin(); tpp!=tplist.end(); tpp++) { - // Get the tape pool object - objectstore::ArchiveQueue aqueue(tpp->address, m_objectStore); + // Walk the archive queues for statistics + for (auto & aqp: re.dumpArchiveQueues()) { + objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore); // debug utility variable - std::string __attribute__((__unused__)) poolName = tpp->tapePool; - objectstore::ScopedSharedLock tplock(aqueue); + std::string __attribute__((__unused__)) poolName = aqp.tapePool; + objectstore::ScopedSharedLock aqlock(aqueue); aqueue.fetch(); // If there are files queued, we create an entry for this tape pool in the // mount candidates list. if (aqueue.getJobsSummary().files) { tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount()); auto & m = tmdi.potentialMounts.back(); - m.tapePool = tpp->tapePool; + m.tapePool = aqp.tapePool; m.type = cta::MountType::ARCHIVE; m.bytesQueued = aqueue.getJobsSummary().bytes; m.filesQueued = aqueue.getJobsSummary().files; @@ -101,39 +100,30 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> m.minArchiveRequestAge = aqueue.getJobsSummary().minArchiveRequestAge; m.logicalLibrary = ""; } - // TODO TODO: cover the retrieve mounts as well - // For each tape in the pool, list the tapes with work -// auto tl = aqueue.dumpTapesAndFetchStatus(); -// for (auto tp = tl.begin(); tp!= tl.end(); tp++) { -// objectstore::Tape t(tp->address, m_objectStore); -// objectstore::ScopedSharedLock tl(t); -// t.fetch(); -// if (t.getJobsSummary().files) { -// tmdi.potentialMounts.push_back(PotentialMount()); -// auto & m = tmdi.potentialMounts.back(); -// m.type = cta::MountType::RETRIEVE; -// m.bytesQueued = t.getJobsSummary().bytes; -// m.filesQueued = t.getJobsSummary().files; -// m.oldestJobStartTime = t.getJobsSummary().oldestJobStartTime; -// m.priority = t.getJobsSummary().priority; -// m.vid = t.getVid(); -// m.logicalLibrary = t.getLogicalLibrary(); -// -// m.mountPolicy.maxFilesQueued = -// aqueue.getMountCriteriaByDirection().retrieve.maxFilesQueued; -// m.mountPolicy.maxBytesQueued = -// aqueue.getMountCriteriaByDirection().retrieve.maxBytesQueued; -// m.mountPolicy.maxAge = -// aqueue.getMountCriteriaByDirection().retrieve.maxAge; -// m.mountPolicy.quota = -// aqueue.getMountCriteriaByDirection().retrieve.quota; -// m.logicalLibrary = t.getLogicalLibrary(); -// } -// } } - // Dedication information comes here - // TODO - // + // Walk the retrieve queues for stiatistics + for (auto & rqp: re.dumpRetrieveQueues()) { + RetrieveQueue rqueue(rqp.address, m_objectStore); + // debug utility variable + std::string __attribute__((__unused__)) vid = rqp.vid; + ScopedSharedLock rqlock(rqueue); + rqueue.fetch(); + // If there are files queued, we create an entry for this retrieve queue in the + // mount candidates list. + if (rqueue.getJobsSummary().files) { + tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount()); + auto & m = tmdi.potentialMounts.back(); + m.vid = rqp.vid; + m.type = cta::MountType::RETRIEVE; + m.bytesQueued = rqueue.getJobsSummary().bytes; + m.filesQueued = rqueue.getJobsSummary().files; + m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime; + m.priority = rqueue.getJobsSummary().priority; + m.maxDrivesAllowed = rqueue.getJobsSummary().maxDrivesAllowed; + m.minArchiveRequestAge = rqueue.getJobsSummary().minArchiveRequestAge; + m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller. + } + } // Collect information about the existing mounts objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore); objectstore::ScopedSharedLock drl(dr); @@ -721,10 +711,10 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& ScopedExclusiveLock rql(rq); rq.fetch(); // We need to find the job corresponding to the vid - for (auto & j: rReq.dumpJobs()) { - if (j.tapeFile.vid == vid) { - rq.addJob(j, rReq.getAddressIfSet(), criteria.archiveFile.fileSize, criteria.mountPolicy, rReq.getEntryLog().time); - rReq.setActiveCopyNumber(j.tapeFile.copyNb); + for (auto & j: rReq.getArchiveFile().tapeFiles) { + if (j.second.vid == vid) { + rq.addJob(j.second.copyNb, rReq.getAddressIfSet(), criteria.archiveFile.fileSize, criteria.mountPolicy, rReq.getEntryLog().time); + rReq.setActiveCopyNumber(j.second.copyNb); goto jobAdded; } } @@ -829,9 +819,9 @@ std::map<std::string, std::list<common::dataStructures::RetrieveJob> > OStoreDB: ScopedSharedLock rrl(rr); rr.fetch(); jd.request=rr.getSchedulerRequest(); - for (auto & tc: rr.getJobs()) { - jd.tapeCopies[tc.tapeFile.vid].first=tc.tapeFile.copyNb; - jd.tapeCopies[tc.tapeFile.vid].second=tc.tapeFile; + for (auto & tf: rr.getArchiveFile().tapeFiles) { + jd.tapeCopies[tf.second.vid].first=tf.second.copyNb; + jd.tapeCopies[tf.second.vid].second=tf.second; } } catch (...) { ret[rqp.vid].pop_back(); @@ -913,100 +903,55 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount> OStoreDB::TapeMountDecisionInfo::createRetrieveMount( const std::string& vid, const std::string & tapePool, const std::string driveName, const std::string& logicalLibrary, const std::string& hostName, time_t startTime) { - throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); -// // In order to create the mount, we have to: -// // Check we actually hold the scheduling lock -// // Check the tape exists, add it to ownership and set its activity status to -// // busy, with the current agent pointing to it for unbusying -// // Set the drive status to up, but do not commit anything to the drive register -// // the drive register does not need garbage collection as it should reflect the -// // latest known state of the drive (and its absence of updating if needed) -// // Prepare the return value -// std::unique_ptr<OStoreDB::RetrieveMount> privateRet( -// new OStoreDB::RetrieveMount(m_objectStore, m_agent)); -// auto &rm = *privateRet; -// // Check we hold the scheduling lock -// if (!m_lockTaken) -// throw SchedulingLockNotHeld("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount: " -// "cannot create mount without holding scheduling lock"); -// // Find the tape and update it -// objectstore::RootEntry re(m_objectStore); -// objectstore::ScopedSharedLock rel(re); -// re.fetch(); -// auto tplist = re.dumpTapePools(); -// auto driveRegisterAddress = re.getDriveRegisterAddress(); -// rel.release(); -// { -// std::string tpAdress; -// for (auto tpp=tplist.begin(); tpp!=tplist.end(); tpp++) -// if (tpp->tapePool == tapePool) -// tpAdress = tpp->address; -// if (!tpAdress.size()) -// throw NoSuchArchiveQueue("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount:" -// " tape pool not found"); -// objectstore::TapePool tp(tpAdress, m_objectStore); -// objectstore::ScopedSharedLock tpl(tp); -// tp.fetch(); -// auto tlist = tp.dumpTapesAndFetchStatus(); -// std::string tAddress; -// for (auto tptr = tlist.begin(); tptr!=tlist.end(); tptr++) { -// if (tptr->vid == vid) -// tAddress = tptr->address; -// } -// if (!tAddress.size()) -// throw NoSuchTape("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount:" -// " tape not found"); -// objectstore::Tape t(tAddress, m_objectStore); -// objectstore::ScopedExclusiveLock tlock(t); -// t.fetch(); -// if (t.isArchived()) -// throw TapeNotWritable("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount:" -// " the tape is not readable (archived)"); -// if (t.isDisabled()) -// throw TapeNotWritable("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount:" -// " the tape is not readable (disabled)"); -// if (t.isBusy()) -// throw TapeIsBusy("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount:" -// " the tape is busy"); -// // This tape seems fine for our purposes. We will set it as an owned object -// // so that garbage collection can unbusy the tape in case of a session crash -// { -// objectstore::ScopedExclusiveLock al(m_agent); -// m_agent.fetch(); -// m_agent.addToOwnership(t.getAddressIfSet()); -// m_agent.commit(); -// } -// t.setBusy(driveName, objectstore::Tape::MountType::Archive, hostName, startTime, -// m_agent.getAddressIfSet()); -// t.commit(); -// } -// // Fill up the mount info -// rm.mountInfo.vid = vid; -// rm.mountInfo.drive = driveName; -// rm.mountInfo.logicalLibrary = logicalLibrary; -// rm.mountInfo.mountId = m_schedulerGlobalLock->getIncreaseCommitMountId(); -// rm.mountInfo.tapePool = tapePool; -// // Update the status of the drive in the registry -// { -// // Get hold of the drive registry -// objectstore::DriveRegister dr(driveRegisterAddress, m_objectStore); -// objectstore::ScopedExclusiveLock drl(dr); -// dr.fetch(); -// // The drive is already in-session, to prevent double scheduling before it -// // goes to mount state. If the work to be done gets depleted in the mean time, -// // we will switch back to up. -// dr.reportDriveStatus(driveName, logicalLibrary, -// cta::common::DriveStatus::Starting, startTime, -// cta::MountType::RETRIEVE, privateRet->mountInfo.mountId, -// 0, 0, 0, vid, tapePool); -// dr.commit(); -// } -// // We committed the scheduling decision. We can now release the scheduling lock. -// m_lockOnSchedulerGlobalLock.release(); -// m_lockTaken = false; -// // We can now return the mount session object to the user. -// std::unique_ptr<SchedulerDatabase::RetrieveMount> ret(privateRet.release()); -// return ret; + // In order to create the mount, we have to: + // Check we actually hold the scheduling lock + // Check the tape exists, add it to ownership and set its activity status to + // busy, with the current agent pointing to it for unbusying + // Set the drive status to up, but do not commit anything to the drive register + // the drive register does not need garbage collection as it should reflect the + // latest known state of the drive (and its absence of updating if needed) + // Prepare the return value + std::unique_ptr<OStoreDB::RetrieveMount> privateRet( + new OStoreDB::RetrieveMount(m_objectStore, m_agent)); + auto &rm = *privateRet; + // Check we hold the scheduling lock + if (!m_lockTaken) + throw SchedulingLockNotHeld("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount: " + "cannot create mount without holding scheduling lock"); + // Find the tape and update it + objectstore::RootEntry re(m_objectStore); + objectstore::ScopedSharedLock rel(re); + re.fetch(); + auto driveRegisterAddress = re.getDriveRegisterAddress(); + rel.release(); + // Fill up the mount info + rm.mountInfo.vid = vid; + rm.mountInfo.drive = driveName; + rm.mountInfo.mountId = m_schedulerGlobalLock->getIncreaseCommitMountId(); + m_schedulerGlobalLock->commit(); + rm.mountInfo.tapePool = tapePool; + rm.mountInfo.logicalLibrary = logicalLibrary; + // Update the status of the drive in the registry + { + // Get hold of the drive registry + objectstore::DriveRegister dr(driveRegisterAddress, m_objectStore); + objectstore::ScopedExclusiveLock drl(dr); + dr.fetch(); + // The drive is already in-session, to prevent double scheduling before it + // goes to mount state. If the work to be done gets depleted in the mean time, + // we will switch back to up. + dr.reportDriveStatus(driveName, logicalLibrary, + cta::common::DriveStatus::Starting, startTime, + cta::MountType::RETRIEVE, privateRet->mountInfo.mountId, + 0, 0, 0, vid, tapePool); + dr.commit(); + } + // We committed the scheduling decision. We can now release the scheduling lock. + m_lockOnSchedulerGlobalLock.release(); + m_lockTaken = false; + // We can now return the mount session object to the user. + std::unique_ptr<SchedulerDatabase::RetrieveMount> ret(privateRet.release()); + return ret; } OStoreDB::TapeMountDecisionInfo::~TapeMountDecisionInfo() { @@ -1040,12 +985,13 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: } // The archive queue is gone, there is no more job if (!aqAddress.size()) - return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + return nullptr; // Try and open the archive queue. It could be gone by now. try { objectstore::ArchiveQueue aq(aqAddress, m_objectStore); - objectstore::ScopedExclusiveLock aql(aq); + objectstore::ScopedExclusiveLock aql; try { + aql.lock(aq); aq.fetch(); } catch (cta::exception::Exception & ex) { // The queue is now absent. We can remove its reference in the root entry. @@ -1060,7 +1006,7 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: re.removeArchiveQueueAndCommit(mountInfo.tapePool); } catch (RootEntry::ArchivelQueueNotEmpty & ex) { // TODO: improve: if we fail here we could retry to fetch a job. - return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + return nullptr; } } // Pop jobs until we find one actually belonging to the queue. @@ -1101,9 +1047,9 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: // Make the ownership official (for this job within the request) privateRet->m_archiveRequest.setJobOwner(job.copyNb, m_agent.getAddressIfSet()); privateRet->m_archiveRequest.commit(); - // Remove the job from the tape pool queue + // Remove the job from the archive queue aq.removeJob(privateRet->m_archiveRequest.getAddressIfSet()); - // We can commit and release the tape pool lock, we will only fill up + // We can commit and release the archive queue lock, we will only fill up // memory structure from here on. aq.commit(); aql.release(); @@ -1127,9 +1073,9 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: objectstore::ScopedExclusiveLock rel (re); re.fetch(); re.removeArchiveQueueAndCommit(mountInfo.tapePool); - return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + return nullptr; } catch (cta::exception::Exception & ex){ - return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + return nullptr; } } // Open the archive queue @@ -1217,85 +1163,97 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo( } auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::RetrieveJob> { - throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); -// // Find the next file to retrieve -// // Get the tape pool and then tape -// objectstore::RootEntry re(m_objectStore); -// objectstore::ScopedSharedLock rel(re); -// re.fetch(); -// auto tpl = re.dumpTapePools(); -// rel.release(); -// std::string tpAddress; -// for (auto tpp = tpl.begin(); tpp != tpl.end(); tpp++) { -// if (tpp->tapePool == mountInfo.tapePool) -// tpAddress = tpp->address; -// } -// if (!tpAddress.size()) -// throw NoSuchArchiveQueue("In OStoreDB::RetrieveMount::getNextJob(): tape pool not found"); -// objectstore::TapePool tp(tpAddress, m_objectStore); -// objectstore::ScopedSharedLock tplock(tp); -// tp.fetch(); -// auto tl = tp.dumpTapes(); -// tplock.release(); -// std::string tAddress; -// for (auto tptr = tl.begin(); tptr != tl.end(); tptr++) { -// if (tptr->vid == mountInfo.vid) -// tAddress = tptr->address; -// } -// if (!tAddress.size()) -// throw NoSuchTape("In OStoreDB::RetrieveMount::getNextJob(): tape not found"); -// objectstore::Tape t(tAddress, m_objectStore); -// objectstore::ScopedExclusiveLock tlock(t); -// t.fetch(); -// while (t.dumpJobs().size()) { -// // Get the tape pool's jobs list, and pop the first -// auto jl=t.dumpJobs(); -// // First take a lock on and download the job -// // If the request is not around anymore, we will just move the the next -// // Prepare the return value -// std::unique_ptr<OStoreDB::RetrieveJob> privateRet(new OStoreDB::RetrieveJob( -// jl.front().address, m_objectStore, m_agent)); -// privateRet->m_copyNb = jl.front().copyNb; -// objectstore::ScopedExclusiveLock rtfrl; -// try { -// rtfrl.lock(privateRet->m_rtfr); -// privateRet->m_rtfr.fetch(); -// } catch (cta::exception::Exception &) { -// // we failed to access the object. It might be missing. -// // Just pop this job from the pool and move to the next. -// t.removeJob(privateRet->m_rtfr.getAddressIfSet()); -// // Commit in case we do not pass by again. -// t.commit(); -// continue; -// } -// // Take ownership of the job -// // Add to ownership -// objectstore::ScopedExclusiveLock al(m_agent); -// m_agent.fetch(); -// m_agent.addToOwnership(privateRet->m_rtfr.getAddressIfSet()); -// m_agent.commit(); -// al.release(); -// // Make the ownership official (for the whole request in retrieves) -// privateRet->m_rtfr.setOwner(m_agent.getAddressIfSet()); -// privateRet->m_rtfr.commit(); -// // Remove the job from the tape pool queue -// t.removeJob(privateRet->m_rtfr.getAddressIfSet()); -// // We can commit and release the tape pool lock, we will only fill up -// // memory structure from here on. -// t.commit(); -// privateRet->archiveFile = privateRet->m_rtfr.getArchiveFile(); -// privateRet->remoteFile = privateRet->m_rtfr.getRemoteFile(); -// objectstore::RetrieveToFileRequest::JobDump jobDump = privateRet->m_rtfr.getJob(privateRet->m_copyNb); -// privateRet->nameServerTapeFile.tapeFileLocation.fSeq = jobDump.fseq; -// privateRet->nameServerTapeFile.tapeFileLocation.blockId = jobDump.blockid; -// privateRet->nameServerTapeFile.tapeFileLocation.copyNb = privateRet->m_copyNb; -// privateRet->nameServerTapeFile.tapeFileLocation.vid = mountInfo.vid; -// -// std::numeric_limits<decltype(privateRet->nameServerTapeFile.tapeFileLocation.blockId)>::max(); -// privateRet->m_jobOwned = true; -// return std::unique_ptr<SchedulerDatabase::RetrieveJob> (privateRet.release()); -// } -// return std::unique_ptr<SchedulerDatabase::RetrieveJob>(); + // Find the next file to retrieve + // Get the tape pool and then tape + objectstore::RootEntry re(m_objectStore); + objectstore::ScopedSharedLock rel(re); + re.fetch(); + auto rql = re.dumpRetrieveQueues(); + rel.release(); + std::string rqAddress; + for (auto & rqp: rql) { + if (rqp.vid == mountInfo.vid) + rqAddress = rqp.address; + } + // The retrieve queue is gone. There is no more job. + if (!rqAddress.size()) + return nullptr; + // Try and open the retrieve queue. It could be gone by now. + try { + objectstore::RetrieveQueue rq(rqAddress, m_objectStore); + objectstore::ScopedExclusiveLock rqlock; + try { + rqlock.lock(rq); + rq.fetch(); + } catch (cta::exception::Exception & ex) { + // The queue is now absent. We can remove its reference in the root entry. + // A new queue could have been added in the mean time, and be non-empty. + // We will then fail to remove from the RootEntry (non-fatal). + // TODO: We still conclude that the queue is empty on this unlikely event. + // (cont'd): A better approach would be to retart the process of this function + // from scratch. + rel.lock(re); + re.fetch(); + try { + re.removeRetrieveQueueAndCommit(mountInfo.vid); + } catch (RootEntry::RetrieveQueueNotEmpty & ex) { + // TODO: improve: if we fail here we could retry to fetch a job. + return nullptr; + } + } + // Pop jobs until we find one actually belonging to the queue. + // Any job not really belonging is an uncommitted pop, which we will + // re-do here. + while (rq.dumpJobs().size()) { + // First take a lock on and download the job + // If the request is not around anymore, we will just move the the next + // Prepare the return value + auto job=rq.dumpJobs().front(); + std::unique_ptr<OStoreDB::RetrieveJob> privateRet(new OStoreDB::RetrieveJob( + job.address, m_objectStore, m_agent, *this)); + privateRet->selectedCopyNb = job.copyNb; + objectstore::ScopedExclusiveLock rrl; + try { + rrl.lock(privateRet->m_retrieveRequest); + privateRet->m_retrieveRequest.fetch(); + if(privateRet->m_retrieveRequest.getOwner() != rq.getAddressIfSet()) { + rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet()); + continue; + } + } catch (cta::exception::Exception &) { + // we failed to access the object. It might be missing. + // Just pop this job from the queue and move to the next. + rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet()); + // Commit in case we do not pass by again. + rq.commit(); + continue; + } + // Take ownership of the job + // Add to ownership + objectstore::ScopedExclusiveLock al(m_agent); + m_agent.fetch(); + m_agent.addToOwnership(privateRet->m_retrieveRequest.getAddressIfSet()); + m_agent.commit(); + al.release(); + // Make the ownership official + privateRet->m_retrieveRequest.setOwner(m_agent.getAddressIfSet()); + privateRet->m_retrieveRequest.commit(); + // Remove the job from the archive queue + rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet()); + // We can commit and release the retrieve queue lock, we will only fill up + // memory structure from here on. + rq.commit(); + rqlock.release(); + privateRet->retrieveRequest = privateRet->m_retrieveRequest.getSchedulerRequest(); + privateRet->archiveFile = privateRet->m_retrieveRequest.getArchiveFile(); + privateRet->m_jobOwned = true; + privateRet->m_mountId = mountInfo.mountId; + return std::unique_ptr<SchedulerDatabase::RetrieveJob> (std::move(privateRet)); + } + return std::unique_ptr<SchedulerDatabase::RetrieveJob>(); + } catch (cta::exception::Exception & ex) { + return nullptr; + } } void OStoreDB::RetrieveMount::complete(time_t completionTime) { @@ -1463,8 +1421,10 @@ OStoreDB::ArchiveJob::~ArchiveJob() { } OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress, - objectstore::Backend& os, objectstore::Agent& ag): m_jobOwned(false), - m_objectStore(os), m_agent(ag), m_retrieveRequest(jobAddress, os) { } + objectstore::Backend& os, objectstore::Agent& ag, + OStoreDB::RetrieveMount& rm): m_jobOwned(false), + m_objectStore(os), m_agent(ag), m_retrieveRequest(jobAddress, os), + m_retrieveMount(rm) { } void OStoreDB::RetrieveJob::fail() { throw NotImplemented(""); @@ -1546,7 +1506,7 @@ void OStoreDB::RetrieveJob::succeed() { objectstore::ScopedExclusiveLock rtfrl(m_retrieveRequest); m_retrieveRequest.fetch(); std::string rtfrAddress = m_retrieveRequest.getAddressIfSet(); - if (m_retrieveRequest.setJobSuccessful(m_copyNb)) { + if (m_retrieveRequest.setJobSuccessful(selectedCopyNb)) { m_retrieveRequest.remove(); } else { m_retrieveRequest.commit(); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 0344ed2ecdfb8c026e792e67a5f96b8c833503da..f15a7525fc658ae55f995b39d0ab956c2e444e4e 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -127,7 +127,7 @@ public: /* === Retrieve Job handling ============================================== */ class RetrieveJob: public SchedulerDatabase::RetrieveJob { - friend class RetrieveMount; + friend class OStoreDB::RetrieveMount; public: CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); @@ -135,14 +135,14 @@ public: virtual void fail() override; virtual ~RetrieveJob() override; private: - RetrieveJob(const std::string &, objectstore::Backend &, objectstore::Agent &); + RetrieveJob(const std::string &, objectstore::Backend &, + objectstore::Agent &, RetrieveMount &); bool m_jobOwned; - uint16_t m_copyNb; + uint64_t m_mountId; objectstore::Backend & m_objectStore; objectstore::Agent & m_agent; objectstore::RetrieveRequest m_retrieveRequest; - std::map<std::string, std::string> m_vidToAddress; /**< Cache of tape objects - * addresses filled up at queuing time */ + RetrieveMount & m_retrieveMount; }; /* === Archive requests handling ========================================= */ diff --git a/scheduler/RetrieveJob.cpp b/scheduler/RetrieveJob.cpp index e49c1784fa94d80b1b41dffcb402b985a5923cb3..dad31c58b644a56e7a930676d275dde53e384b62 100644 --- a/scheduler/RetrieveJob.cpp +++ b/scheduler/RetrieveJob.cpp @@ -28,14 +28,14 @@ cta::RetrieveJob::~RetrieveJob() throw() { // constructor //------------------------------------------------------------------------------ cta::RetrieveJob::RetrieveJob(RetrieveMount &mount, - const common::dataStructures::ArchiveFile &archiveFile, - const std::string &remotePath, - const common::dataStructures::TapeFile &tapeFile, + const common::dataStructures::RetrieveRequest &retrieveRequest, + const common::dataStructures::ArchiveFile & archiveFile, + const uint64_t selectedCopyNb, const PositioningMethod positioningMethod): m_mount(mount), + retrieveRequest(retrieveRequest), archiveFile(archiveFile), - remotePath(remotePath), - tapeFile(tapeFile), + selectedCopyNb(selectedCopyNb), positioningMethod(positioningMethod), transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {} @@ -59,3 +59,28 @@ void cta::RetrieveJob::failed() { void cta::RetrieveJob::retry() { throw std::runtime_error("cta::RetrieveJob::retry(): not implemented"); } + +//------------------------------------------------------------------------------ +// selectedTapeFile +//------------------------------------------------------------------------------ +cta::common::dataStructures::TapeFile& cta::RetrieveJob::selectedTapeFile() { + try { + return archiveFile.tapeFiles.at(selectedCopyNb); + } catch (std::out_of_range &ex) { + auto __attribute__((__unused__)) & debug=ex; + throw; + } +} + +//------------------------------------------------------------------------------ +// selectedTapeFile +//------------------------------------------------------------------------------ +const cta::common::dataStructures::TapeFile& cta::RetrieveJob::selectedTapeFile() const { + try { + return archiveFile.tapeFiles.at(selectedCopyNb); + } catch (std::out_of_range &ex) { + auto __attribute__((__unused__)) & debug=ex; + throw; + } +} + diff --git a/scheduler/RetrieveJob.hpp b/scheduler/RetrieveJob.hpp index d443a22f35db971d49c0b82e5dd5503eab9630b0..40ea4dfc8633f803b8f995beb44c1070ef62de7a 100644 --- a/scheduler/RetrieveJob.hpp +++ b/scheduler/RetrieveJob.hpp @@ -53,9 +53,9 @@ public: * @param positioningMethod the positioning method */ RetrieveJob(RetrieveMount &mount, - const common::dataStructures::ArchiveFile &archiveFile, - const std::string &remotePath, - const common::dataStructures::TapeFile &tapeFileLocation, + const common::dataStructures::RetrieveRequest &retrieveRequest, + const common::dataStructures::ArchiveFile & archiveFile, + const uint64_t selectedCopyNb, const PositioningMethod positioningMethod); private: @@ -93,6 +93,16 @@ public: */ void retry(); + /** + * Helper function returning a reference to the currently selected tape file. + */ + common::dataStructures::TapeFile & selectedTapeFile(); + + /** + * Helper function returning a reference to the currently selected tape file (const variant). + */ + const common::dataStructures::TapeFile & selectedTapeFile() const; + /** * The mount to which the job belongs. */ @@ -101,17 +111,17 @@ public: /** * The NS archive file information */ - common::dataStructures::ArchiveFile archiveFile; + common::dataStructures::RetrieveRequest retrieveRequest; /** - * The remote file path + * The full information about the file */ - std::string remotePath; + common::dataStructures::ArchiveFile archiveFile; /** - * The location of the tape file + * CopyNb of the selected tape file */ - common::dataStructures::TapeFile tapeFile; + uint64_t selectedCopyNb; /** * The positioning method diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index 617617b55720bf70814db07f925a0f91402f3b24..78b6d53cc121da9551abeaa3e22428b3201d587b 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -77,7 +77,7 @@ std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob() { return std::unique_ptr<cta::RetrieveJob>(); // We have something to retrieve: prepare the response std::unique_ptr<cta::RetrieveJob> ret (new RetrieveJob(*this, - dbJob->archiveFile, dbJob->remoteFile, dbJob->tapeFile, + dbJob->retrieveRequest, dbJob->archiveFile, dbJob->selectedCopyNb, PositioningMethod::ByBlock)); ret->m_dbJob.reset(dbJob.release()); return ret; diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 9b120b926c942d4c1d316cb8fe747a38b3c741f7..5e537a24cc10db7a0e35dba2b42bf00044c7d93b 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -354,6 +354,23 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(const std::string & mountInfo = m_db.getMountInfo(); __attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo; + // The library information is not know for the tapes involved in retrieves. We + // need to query the catalogue now about all those tapes. + // Build the list of tapes. + std::set<std::string> tapeSet; + for (auto &m:mountInfo->potentialMounts) { + if (m.type==cta::MountType::RETRIEVE) tapeSet.insert(m.vid); + } + if (tapeSet.size()) { + auto tapesInfo=m_catalogue.getTapesByVid(tapeSet); + for (auto &m:mountInfo->potentialMounts) { + if (m.type==cta::MountType::RETRIEVE) { + m.logicalLibrary=tapesInfo[m.vid].logicalLibraryName; + m.tapePool=tapesInfo[m.vid].tapePoolName; + } + } + } + // We should now filter the potential mounts to keep only the ones we are // compatible with (match the logical library for retrieves). // We also only want the potential mounts for which we still have diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 3990ab6b4d578897852c280a419971c9bd26baca..9ddacac917c362f43d3fb557bee0b17256660b0d 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -320,9 +320,9 @@ public: class RetrieveJob { friend class RetrieveMount; public: - std::string remoteFile; + cta::common::dataStructures::RetrieveRequest retrieveRequest; cta::common::dataStructures::ArchiveFile archiveFile; - cta::common::dataStructures::TapeFile tapeFile; + uint64_t selectedCopyNb; virtual void succeed() = 0; virtual void fail() = 0; virtual ~RetrieveJob() {} diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index d936990a3939d64c60e93a686ca8d5159c9ded96..2358bccf89c62de270c292abb5949cf2ce03097b 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -281,7 +281,7 @@ TEST_P(SchedulerTest, DISABLED_delete_archive_request) { request.diskpoolThroughput=200*1000*1000; request.diskFileInfo=diskFileInfo; request.diskFileID="diskFileID"; - request.instance="cms"; + request.instance="disk_instance"; request.fileSize=100*1000*1000; cta::common::dataStructures::UserIdentity requester; requester.name = s_userName; @@ -323,7 +323,7 @@ TEST_P(SchedulerTest, DISABLED_delete_archive_request) { ASSERT_FALSE(found); } -TEST_P(SchedulerTest, DISABLED_archive_and_retrieve_new_file) { +TEST_P(SchedulerTest, archive_and_retrieve_new_file) { using namespace cta; Scheduler &scheduler = getScheduler(); @@ -501,7 +501,7 @@ TEST_P(SchedulerTest, DISABLED_retry_archive_until_max_reached) { request.diskpoolThroughput=200*1000*1000; request.diskFileInfo=diskFileInfo; request.diskFileID="diskFileID"; - request.instance="cms"; + request.instance="disk_instance"; request.fileSize=100*1000*1000; cta::common::dataStructures::UserIdentity requester; requester.name = s_userName; diff --git a/scheduler/testingMocks/MockRetrieveJob.hpp b/scheduler/testingMocks/MockRetrieveJob.hpp index 43aa568c8f9c436ff9c27ae08224020ef85e66b7..be9417e464436c7e7dfa43cfa8a2445429566e2a 100644 --- a/scheduler/testingMocks/MockRetrieveJob.hpp +++ b/scheduler/testingMocks/MockRetrieveJob.hpp @@ -28,9 +28,11 @@ namespace cta { int completes; int failures; MockRetrieveJob(RetrieveMount & rm): cta::RetrieveJob(rm, - cta::common::dataStructures::ArchiveFile(), - std::string(), cta::common::dataStructures::TapeFile(), - cta::PositioningMethod::ByBlock), completes(0), failures(0) {} + cta::common::dataStructures::RetrieveRequest(), + cta::common::dataStructures::ArchiveFile(), 1, + cta::PositioningMethod::ByBlock), completes(0), failures(0) { + archiveFile.tapeFiles[1]; + } virtual void complete() { completes++; } virtual void failed() { failures++; }; diff --git a/tapeserver/castor/log/LogContextTest.cpp b/tapeserver/castor/log/LogContextTest.cpp index 23521528446a20a4b7dcce119ba9dbb0b24cfeba..340fce86d54f4acf5edad83261493d13f48dc0ce 100644 --- a/tapeserver/castor/log/LogContextTest.cpp +++ b/tapeserver/castor/log/LogContextTest.cpp @@ -37,12 +37,12 @@ namespace unitTests { ASSERT_EQ(1U, lc.size()); { // Create an anonymous variable (for its scope) - LogContext::ScopedParam sp(lc, Param("NSFILEID", 12345)); + LogContext::ScopedParam sp(lc, Param("archiveFileID", 12345)); ASSERT_EQ(2U, lc.size()); lc.log(LOG_DEBUG, "Two params message"); { // Test that we do not allow duplicate params - LogContext::ScopedParam sp(lc, Param("NSFILEID", 123456)); + LogContext::ScopedParam sp(lc, Param("archiveFileID", 123456)); ASSERT_EQ(2U, lc.size()); LogContext::ScopedParam sp2(lc, Param("TPVID", "T1234")); ASSERT_EQ(3U, lc.size()); @@ -62,16 +62,16 @@ namespace unitTests { std::string first = sl.getLog(); ASSERT_NE(std::string::npos, first.find("MigrationRequestId")); { - LogContext::ScopedParam sp(lc, Param("NSFILEID", 12345)); + LogContext::ScopedParam sp(lc, Param("archiveFileID", 12345)); lc.log(LOG_INFO, "Second log"); } std::string second = sl.getLog(); - ASSERT_NE(std::string::npos, second.find("NSFILEID")); + ASSERT_NE(std::string::npos, second.find("archiveFileID")); // We expect the NSFILEID parameter to show up only once (i.e, not after // offset, which marks the end of its first occurrence). lc.log(LOG_INFO, "Third log"); std::string third = sl.getLog(); - size_t offset = third.find("NSFILEID") + strlen("NSFILEID"); - ASSERT_EQ(std::string::npos, third.find("NSFILEID", offset)); + size_t offset = third.find("archiveFileID") + strlen("archiveFileID"); + ASSERT_EQ(std::string::npos, third.find("archiveFileID", offset)); } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp index 6bcc84929a0e7e3d9baead467a77bf8ddc5bec0b..d5ee145ca3ad47b2f42868a8320daaaeb76240c3 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp @@ -51,9 +51,9 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc, castor::utils::Timer totalTime(localTime); castor::utils::Timer transferTime(localTime); log::ScopedParamContainer URLcontext(lc); - URLcontext.add("NSFILEID",m_retrieveJob->archiveFile.archiveFileID) - .add("path", m_retrieveJob->remotePath) - .add("fSeq",m_retrieveJob->tapeFile.fSeq); + URLcontext.add("archiveFileID",m_retrieveJob->retrieveRequest.archiveFileID) + .add("dstURL", m_retrieveJob->retrieveRequest.dstURL) + .add("fSeq",m_retrieveJob->selectedTapeFile().fSeq); // This out-of-try-catch variables allows us to record the stage of the // process we're in, and to count the error if it occurs. // We will not record errors for an empty string. This will allow us to @@ -88,7 +88,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc, // Synchronise the counter with the open time counter. currentErrorToCount = "Error_diskOpenForWrite"; transferTime = localTime; - writeFile.reset(fileFactory.createWriteFile(m_retrieveJob->remotePath)); + writeFile.reset(fileFactory.createWriteFile(m_retrieveJob->retrieveRequest.dstURL)); URLcontext.add("actualURL", writeFile->URL()); lc.log(LOG_INFO, "Opened disk file for writing"); m_stats.openingTime+=localTime.secs(castor::utils::Timer::resetCounter); @@ -206,12 +206,12 @@ void DiskWriteTask::releaseAllBlock(){ //------------------------------------------------------------------------------ void DiskWriteTask::checkErrors(MemBlock* mb,int blockId,castor::log::LogContext& lc){ using namespace castor::log; - if(m_retrieveJob->archiveFile.archiveFileID != static_cast<unsigned int>(mb->m_fileid) + if(m_retrieveJob->retrieveRequest.archiveFileID != static_cast<unsigned int>(mb->m_fileid) || blockId != mb->m_fileBlock || mb->isFailed() ){ LogContext::ScopedParam sp[]={ - LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)), - LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)), - LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)), + LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)), + LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", blockId)), + LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)), LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed())) }; tape::utils::suppresUnusedVariable(sp); @@ -259,8 +259,8 @@ void DiskWriteTask::logWithStat(int level,const std::string& msg,log::LogContext m_stats.transferTime?1.0*m_stats.dataVolume/1000/1000/m_stats.transferTime:0) .add("openRWCloseToTransferTimeRatio", m_stats.transferTime?(m_stats.openingTime+m_stats.readWriteTime+m_stats.closingTime)/m_stats.transferTime:0.0) - .add("FILEID",m_retrieveJob->archiveFile.archiveFileID) - .add("path",m_retrieveJob->remotePath); + .add("archiveFileID",m_retrieveJob->retrieveRequest.archiveFileID) + .add("dstURL",m_retrieveJob->retrieveRequest.dstURL); lc.log(level,msg); } }}}} diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index e471b7712f4ce49c9a45ca15f415f73eb83f08c9..ca7e8b52bdfa1a8c67de2914176bd24d05578c92 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -33,6 +33,7 @@ #include "scheduler/Scheduler.hpp" #include "remotens/MockRemoteNS.hpp" +#include "scheduler/testingMocks/MockRetrieveMount.hpp" #include <memory> #include <gtest/gtest.h> @@ -53,9 +54,9 @@ namespace unitTests{ class TestingRetrieveJob: public cta::RetrieveJob { public: - TestingRetrieveJob(): cta::RetrieveJob(*((cta::RetrieveMount *)NULL), - cta::common::dataStructures::ArchiveFile(), - std::string(), cta::common::dataStructures::TapeFile(), + TestingRetrieveJob(cta::RetrieveMount & rm): cta::RetrieveJob(rm, + cta::common::dataStructures::RetrieveRequest(), + cta::common::dataStructures::ArchiveFile(), 1, cta::PositioningMethod::ByBlock) {} }; @@ -102,8 +103,11 @@ namespace unitTests{ RecallMemoryManager mm(10,100,lc); DiskFileFactory fileFactory("RFIO",""); - std::unique_ptr<TestingRetrieveJob> fileToRecall(new TestingRetrieveJob()); - fileToRecall->archiveFile.archiveFileID = 0; + cta::MockRetrieveMount mrm; + std::unique_ptr<TestingRetrieveJob> fileToRecall(new TestingRetrieveJob(mrm)); + fileToRecall->retrieveRequest.archiveFileID = 1; + fileToRecall->selectedCopyNb=1; + fileToRecall->archiveFile.tapeFiles[1]; DiskWriteTask t(fileToRecall.release(),mm); for(int i=0;i<6;++i){ MemBlock* mb=mm.getFreeBlock(); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp index 7368bf96aadadf65714e50a5769cb4264dee1f5a..2d545032795fa1dd0361de2a9fd1fd35afe547d5 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp @@ -49,8 +49,8 @@ namespace unitTests{ class TestingRetrieveJob: public cta::RetrieveJob { public: TestingRetrieveJob(): cta::RetrieveJob(*((cta::RetrieveMount *)NULL), - cta::common::dataStructures::ArchiveFile(), - std::string(), cta::common::dataStructures::TapeFile(), + cta::common::dataStructures::RetrieveRequest(), + cta::common::dataStructures::ArchiveFile(), 1, cta::PositioningMethod::ByBlock) {} }; @@ -110,9 +110,11 @@ namespace unitTests{ for(int i=0;i<5;++i){ std::unique_ptr<TestingRetrieveJob> fileToRecall(new TestingRetrieveJob()); - fileToRecall->archiveFile.archiveFileID = i+1; - fileToRecall->remotePath = "/dev/null"; - fileToRecall->tapeFile.blockId = 1; + fileToRecall->retrieveRequest.archiveFileID = i+1; + fileToRecall->retrieveRequest.dstURL = "/dev/null"; + fileToRecall->selectedCopyNb=1; + fileToRecall->archiveFile.tapeFiles[1]; + fileToRecall->selectedTapeFile().blockId = 1; DiskWriteTask* t=new DiskWriteTask(fileToRecall.release(),mm); MemBlock* mb=mm.getFreeBlock(); mb->m_fileid=i+1; diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp index 4b2265070fe99c97599fdeacfa37c327059134d2..7ccf5ab1b3a519dd3612e67f009c8bbfe51152cf 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp @@ -56,7 +56,7 @@ namespace daemon { for(auto it= jobs.begin();it!=jobs.end();++it){ const uint64_t fileSize = (*it)->archiveFile.fileSize; LogContext::ScopedParam sp[]={ - LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->archiveFile.archiveFileID)), + LogContext::ScopedParam(m_lc, Param("archiveFileID", (*it)->archiveFile.archiveFileID)), LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->tapeFile.fSeq)), LogContext::ScopedParam(m_lc, Param("path", (*it)->srcURL)) }; diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp index 201676edcbf9be533d9374b5e585c72f0e56ae77..7063d18ba6ab7b3eedfa284c7bc0fbb7394fb6c4 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp @@ -90,10 +90,10 @@ void RecallTaskInjector::injectBulkRecalls(std::vector<std::unique_ptr<cta::Retr (*it)->positioningMethod=cta::PositioningMethod::ByBlock; LogContext::ScopedParam sp[]={ - LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->archiveFile.archiveFileID)), - LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->tapeFile.fSeq)), - LogContext::ScopedParam(m_lc, Param("blockID", (*it)->tapeFile.blockId)), - LogContext::ScopedParam(m_lc, Param("path", (*it)->remotePath)) + LogContext::ScopedParam(m_lc, Param("archiveFileID", (*it)->retrieveRequest.archiveFileID)), + LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->selectedTapeFile().fSeq)), + LogContext::ScopedParam(m_lc, Param("blockID", (*it)->selectedTapeFile().blockId)), + LogContext::ScopedParam(m_lc, Param("dstURL", (*it)->retrieveRequest.dstURL)) }; tape::utils::suppresUnusedVariable(sp); diff --git a/tapeserver/castor/tape/tapeserver/daemon/ReportPackerInterface.hpp b/tapeserver/castor/tape/tapeserver/daemon/ReportPackerInterface.hpp index b18319dbdc8c20ee38d5457a7a082d78f62f179b..5ef693e4c624a02a57cd55d446c2be91e1b46b09 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/ReportPackerInterface.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/ReportPackerInterface.hpp @@ -78,7 +78,7 @@ template <class PlaceHolder> class ReportPackerInterface{ for(typename C::const_iterator it=c.begin();it!=c.end();++it) { log::ScopedParamContainer sp(m_lc); - sp.add("NSFILEID",(*it)->fileid()) + sp.add("archiveFileID",(*it)->fileid()) .add("NSFSEQ", (*it)->fseq()) .add("NSHOST", (*it)->nshost()) .add("NSFILETRANSACTIONID", (*it)->fileTransactionId()); @@ -97,7 +97,7 @@ template <class PlaceHolder> class ReportPackerInterface{ for(typename C::const_iterator it=c.begin();it!=c.end();++it) { log::ScopedParamContainer sp(m_lc); - sp.add("NSFILEID",(*it)->fileid()) + sp.add("archiveFileID",(*it)->fileid()) .add("NSFSEQ", (*it)->fseq()) .add("NSHOST", (*it)->nshost()) .add("NSFILETRANSACTIONID", (*it)->fileTransactionId()) diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp index 4470baeab94ba52f3d08be0f7f91f479b99c7adc..cad0cc09d9e2fae4deefaf51ee5fc8ad427293ed 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp @@ -69,10 +69,10 @@ public: // Set the common context for all the coming logs (file info) log::ScopedParamContainer params(lc); - params.add("NSFILEID", m_retrieveJob->archiveFile.archiveFileID) - .add("BlockId", m_retrieveJob->tapeFile.blockId) - .add("fSeq", m_retrieveJob->tapeFile.fSeq) - .add("path", m_retrieveJob->remotePath); + params.add("archiveFileID", m_retrieveJob->archiveFile.archiveFileID) + .add("BlockId", m_retrieveJob->selectedTapeFile().blockId) + .add("fSeq", m_retrieveJob->selectedTapeFile().fSeq) + .add("dstURL", m_retrieveJob->retrieveRequest.dstURL); // We will clock the stats for the file itself, and eventually add those // stats to the session's. @@ -102,7 +102,7 @@ public: lc.log(LOG_INFO, "Successfully positioned for reading"); localStats.positionTime += timer.secs(castor::utils::Timer::resetCounter); - watchdog.notifyBeginNewJob(m_retrieveJob->archiveFile.archiveFileID, m_retrieveJob->tapeFile.fSeq); + watchdog.notifyBeginNewJob(m_retrieveJob->archiveFile.archiveFileID, m_retrieveJob->selectedTapeFile().fSeq); localStats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter); currentErrorToCount = "Error_tapeReadData"; while (stillReading) { @@ -110,9 +110,9 @@ public: mb=m_mm.getFreeBlock(); localStats.waitFreeMemoryTime += timer.secs(castor::utils::Timer::resetCounter); - mb->m_fSeq = m_retrieveJob->tapeFile.fSeq; + mb->m_fSeq = m_retrieveJob->selectedTapeFile().fSeq; mb->m_fileBlock = fileBlock++; - mb->m_fileid = m_retrieveJob->archiveFile.archiveFileID; + mb->m_fileid = m_retrieveJob->retrieveRequest.archiveFileID; mb->m_tapeFileBlock = tapeBlock; mb->m_tapeBlockSize = rf->getBlockSize(); try { @@ -192,8 +192,8 @@ public: */ void reportCancellationToDiskTask(){ MemBlock* mb =m_mm.getFreeBlock(); - mb->m_fSeq = m_retrieveJob->tapeFile.fSeq; - mb->m_fileid = m_retrieveJob->archiveFile.archiveFileID; + mb->m_fSeq = m_retrieveJob->selectedTapeFile().fSeq; + mb->m_fileid = m_retrieveJob->retrieveRequest.archiveFileID; //mark the block cancelled and push it (plus signal the end) mb->markAsCancelled(); m_fifo.pushDataBlock(mb); @@ -209,8 +209,8 @@ private: // fill it up if (!mb) { mb=m_mm.getFreeBlock(); - mb->m_fSeq = m_retrieveJob->tapeFile.fSeq; - mb->m_fileid = m_retrieveJob->archiveFile.archiveFileID; + mb->m_fSeq = m_retrieveJob->selectedTapeFile().fSeq; + mb->m_fileid = m_retrieveJob->retrieveRequest.archiveFileID; } //mark the block failed and push it (plus signal the end) mb->markAsFailed(msg,code); diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp index 0963d94a3f5d84d87806ca548926be0b4a544f0a..2ca781885224aedd87e47591e2541d41425a46aa 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp @@ -226,9 +226,9 @@ namespace daemon { || mb->isFailed() || mb->isCanceled()) { LogContext::ScopedParam sp[]={ - LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)), - LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", memBlockId)), - LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)), + LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)), + LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", memBlockId)), + LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)), LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed())) }; tape::utils::suppresUnusedVariable(sp); @@ -319,7 +319,7 @@ namespace daemon { .add("payloadTransferSpeedMBps",m_taskStats.totalTime? 1.0*m_taskStats.dataVolume/1000/1000/m_taskStats.totalTime:0.0) .add("fileSize",m_archiveFile.fileSize) - .add("NSFILEID",m_archiveFile.archiveFileID) + .add("archiveFileID",m_archiveFile.archiveFileID) .add("fSeq",m_tapeFile.fSeq) .add("reconciliationTime",m_archiveFile.reconciliationTime) .add("LBPMode", m_LBPMode); diff --git a/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp b/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp index 0d0683fc3448e5958ba7ec8ed0e14665b9d6bf5c..3716111d8c403e4d571c61ee23822a2519ff7873 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp @@ -384,7 +384,7 @@ private: castor::log::ScopedParamContainer params(m_lc); params.add("TimeSinceLastBlockMove", m_blockMovementTimer.secs()) .add("fileId", m_fileId) - .add("NSFILEID",m_fileId) + .add("archiveFileID",m_fileId) .add("fSeq",m_fSeq); m_lc.log(LOG_WARNING, "No tape block movement for too long"); } @@ -435,7 +435,7 @@ private: virtual void logStuckFile() { castor::log::ScopedParamContainer params(m_lc); params.add("TimeSinceLastBlockMove", m_blockMovementTimer.secs()) - .add("NSFILEID",m_fileId) + .add("archiveFileID",m_fileId) .add("fSeq",m_fSeq); m_lc.log(LOG_WARNING, "No tape block movement for too long"); } diff --git a/tapeserver/castor/tape/tapeserver/file/File.cpp b/tapeserver/castor/tape/tapeserver/file/File.cpp index 9caf760612f5d1fcb6ac27d419f2c96b8e5d2af2..8e84441f9596827e7c3cd98e42360262a5114224 100644 --- a/tapeserver/castor/tape/tapeserver/file/File.cpp +++ b/tapeserver/castor/tape/tapeserver/file/File.cpp @@ -145,13 +145,13 @@ namespace castor { const cta::RetrieveJob &filetoRecall, const tape::tapeserver::daemon::VolumeInfo &volInfo) { const std::string &volId = volInfo.vid; - if(!checkHeaderNumericalField(hdr1.getFileId(), filetoRecall.archiveFile.archiveFileID, hexadecimal)) { + if(!checkHeaderNumericalField(hdr1.getFileId(), filetoRecall.retrieveRequest.archiveFileID, hexadecimal)) { // the nsfileid stored in HDR1 as an hexadecimal string . The one in // filetoRecall is numeric std::stringstream ex_str; ex_str << "[HeaderChecker::checkHDR1] - Invalid fileid detected: (0x)\"" << hdr1.getFileId() << "\". Wanted: 0x" << std::hex - << filetoRecall.archiveFile.archiveFileID << std::endl; + << filetoRecall.retrieveRequest.archiveFileID << std::endl; throw TapeFormatError(ex_str.str()); } @@ -167,11 +167,11 @@ namespace castor { void HeaderChecker::checkUHL1(const UHL1 &uhl1, const cta::RetrieveJob &fileToRecall) { if(!checkHeaderNumericalField(uhl1.getfSeq(), - fileToRecall.tapeFile.fSeq, decimal)) { + fileToRecall.selectedTapeFile().fSeq, decimal)) { std::stringstream ex_str; ex_str << "[HeaderChecker::checkUHL1] - Invalid fseq detected in uhl1: \"" << uhl1.getfSeq() << "\". Wanted: " - << fileToRecall.tapeFile.fSeq; + << fileToRecall.selectedTapeFile().fSeq; throw TapeFormatError(ex_str.str()); } } @@ -214,16 +214,16 @@ namespace castor { m_session->release(); } void ReadFile::positionByFseq(const cta::RetrieveJob &fileToRecall) { - if(fileToRecall.tapeFile.fSeq<1) { + if(fileToRecall.selectedTapeFile().fSeq<1) { std::stringstream err; err << "Unexpected fileId in ReadFile::position with fSeq expected >=1, got: " - << fileToRecall.tapeFile.fSeq << ")"; + << fileToRecall.selectedTapeFile().fSeq << ")"; throw castor::exception::InvalidArgument(err.str()); } - int64_t fSeq_delta = fileToRecall.tapeFile.fSeq + int64_t fSeq_delta = fileToRecall.selectedTapeFile().fSeq - m_session->getCurrentFseq(); - if(fileToRecall.tapeFile.fSeq == 1) { + if(fileToRecall.selectedTapeFile().fSeq == 1) { // special case: we can rewind the tape to be faster //(TODO: in the future we could also think of a threshold above //which we rewind the tape anyway and then space forward) @@ -253,15 +253,16 @@ namespace castor { } void ReadFile::positionByBlockID(const cta::RetrieveJob &fileToRecall) { - if(fileToRecall.tapeFile.blockId > - std::numeric_limits<decltype(fileToRecall.tapeFile.blockId)>::max()){ + if(fileToRecall.selectedTapeFile().blockId > + std::numeric_limits<decltype(fileToRecall.selectedTapeFile().blockId)>::max()){ std::stringstream ex_str; - ex_str << "[ReadFile::positionByBlockID] - Block id larger than the supported uint32_t limit: " << fileToRecall.tapeFile.blockId; + ex_str << "[ReadFile::positionByBlockID] - Block id larger than the supported uint32_t limit: " + << fileToRecall.selectedTapeFile().blockId; throw castor::exception::Exception(ex_str.str()); } // if we want the first file on tape (fileInfo.blockId==0) we need to skip the VOL1 header - const uint32_t destination_block = fileToRecall.tapeFile.blockId ? - fileToRecall.tapeFile.blockId : 1; + const uint32_t destination_block = fileToRecall.selectedTapeFile().blockId ? + fileToRecall.selectedTapeFile().blockId : 1; /* we position using the sg locate because it is supposed to do the right thing possibly in a more optimized way (better than st's @@ -305,7 +306,7 @@ namespace castor { } //save the current fSeq into the read session - m_session->setCurrentFseq(fileToRecall.tapeFile.fSeq); + m_session->setCurrentFseq(fileToRecall.selectedTapeFile().fSeq); HDR1 hdr1; HDR2 hdr2; diff --git a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp index 8fd1047c08374d63d28ca2dd6a168c2353be5af8..9f630ecd83fb7d3f47300ee2a942cc55db339679 100644 --- a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp +++ b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp @@ -40,8 +40,8 @@ namespace unitTests { class TestingRetrieveJob: public cta::RetrieveJob { public: TestingRetrieveJob() : cta::RetrieveJob(*((cta::RetrieveMount *)NULL), - cta::common::dataStructures::ArchiveFile(), - std::string(), cta::common::dataStructures::TapeFile(), + cta::common::dataStructures::RetrieveRequest(), + cta::common::dataStructures::ArchiveFile(), 1, cta::PositioningMethod::ByBlock) {} }; @@ -58,9 +58,11 @@ namespace unitTests { virtual void SetUp() { block_size = 262144; label = "K00001"; - fileToRecall.tapeFile.blockId = 0; - fileToRecall.tapeFile.fSeq = 1; - fileToRecall.archiveFile.archiveFileID = 1; + fileToRecall.selectedCopyNb=1; + fileToRecall.archiveFile.tapeFiles[1]; + fileToRecall.selectedTapeFile().blockId = 0; + fileToRecall.selectedTapeFile().fSeq = 1; + fileToRecall.retrieveRequest.archiveFileID = 1; fileToMigrate.archiveFile.fileSize = 500; fileToMigrate.archiveFile.archiveFileID = 1; fileToMigrate.tapeFile.fSeq = 1;