diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 5e29a3d17c5d6e4b0513a24dc45a9ed16cbea55c..690028ea4453e66dc667f097e6d2ad044ba6dd13 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -7,6 +7,7 @@ ## Features ## Bug fixes +- cta/CTA#1102 - Make requeued jobs retain their original creation time # v4.5.1-1 diff --git a/objectstore/ArchiveQueueAlgorithms.cpp b/objectstore/ArchiveQueueAlgorithms.cpp index 2a94443ea74a9b7f1b613a75288b440c165254c8..ac92a9285c8046f0dc62e9e3947e70d8db601903 100644 --- a/objectstore/ArchiveQueueAlgorithms.cpp +++ b/objectstore/ArchiveQueueAlgorithms.cpp @@ -39,8 +39,9 @@ void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, Inse jd.tapePool = cont.getTapePool(); jd.owner = cont.getAddressIfSet(); ArchiveRequest & ar = *e.archiveRequest; + auto creationLog = ar.getCreationLog(); jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize, - e.mountPolicy, time(nullptr)}); + e.mountPolicy, ar.getCreationLog().time}); } cont.addJobsAndCommit(jobsToAdd, agentRef, lc); } diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index ace82492c0e15a29e0667b593c7d589f0c5d8f33..5e9a737ededbe40e4903f450b95638446b3db974 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -298,7 +298,7 @@ addReferencesIfNecessaryAndCommit(Container& cont, typename InsertedElement::lis std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; - jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activity, e.diskSystemName}); + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, rr.getCreationTime(), e.activity, e.diskSystemName}); } cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc); } diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 825358b72f36b87f568231636c10b13d6a2e239e..4c2c659a6d979c914a4164278a54871d5f2f1dd1 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -1033,6 +1033,11 @@ void RetrieveRequest::setCreationTime(const uint64_t creationTime){ m_payload.mutable_lifecycle_timings()->set_creation_time(creationTime); } +uint64_t RetrieveRequest::getCreationTime() { + checkPayloadReadable(); + return m_payload.lifecycle_timings().creation_time(); +} + void RetrieveRequest::setFirstSelectedTime(const uint64_t firstSelectedTime){ checkPayloadWritable(); m_payload.mutable_lifecycle_timings()->set_first_selected_time(firstSelectedTime); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index c398143298037faced152d94794e2424e82ab2e5..0033af4ce77d7ca3ce3c98bb26b9d7ca380c009a 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -259,6 +259,8 @@ public: cta::common::dataStructures::EntryLog getEntryLog(); cta::common::dataStructures::LifecycleTimings getLifecycleTimings(); void setCreationTime(const uint64_t creationTime); + uint64_t getCreationTime(); + void setFirstSelectedTime(const uint64_t firstSelectedTime); void setCompletedTime(const uint64_t completedTime); void setReportedTime(const uint64_t reportedTime); diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp index 41a42993218511920b96467ca658931a4b14af3b..84a595180ae1b46f9168fa11529ad8cc68cccff1 100644 --- a/scheduler/SchedulerDatabaseTest.cpp +++ b/scheduler/SchedulerDatabaseTest.cpp @@ -443,6 +443,179 @@ TEST_P(SchedulerDatabaseTest, createQueueAndPutToSleep) { } +TEST_P(SchedulerDatabaseTest, popAndRequeueArchiveRequests) { + using namespace cta; +#ifndef STDOUT_LOGGING + cta::log::DummyLogger dl("", ""); +#else + cta::log::StdoutLogger dl("", ""); +#endif + cta::log::LogContext lc(dl); + + cta::SchedulerDatabase &db = getDb(); + + // Inject 10 archive jobs to the db. + const size_t filesToDo = 10; + std::list<std::future<void>> jobInsertions; + std::list<std::function<void()>> lambdas; + auto creationTime = time(nullptr); + for (auto i: cta::range<size_t>(filesToDo)) { + lambdas.emplace_back( + [i,&db, creationTime, &lc](){ + cta::common::dataStructures::ArchiveRequest ar; + cta::log::LogContext locallc=lc; + cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId afqc; + afqc.copyToPoolMap.insert({1, "tapePool"}); + afqc.fileId = 0; + afqc.mountPolicy.name = "mountPolicy"; + afqc.mountPolicy.archivePriority = 1; + afqc.mountPolicy.archiveMinRequestAge = 0; + afqc.mountPolicy.retrievePriority = 1; + afqc.mountPolicy.retrieveMinRequestAge = 0; + afqc.mountPolicy.creationLog = { "u", "h", time(nullptr)}; + afqc.mountPolicy.lastModificationLog = { "u", "h", time(nullptr)}; + afqc.mountPolicy.comment = "comment"; + afqc.fileId = i; + ar.archiveReportURL=""; + ar.checksumBlob.insert(cta::checksum::NONE, ""); + ar.creationLog = { "user", "host", creationTime}; + uuid_t fileUUID; + uuid_generate(fileUUID); + char fileUUIDStr[37]; + uuid_unparse(fileUUID, fileUUIDStr); + ar.diskFileID = fileUUIDStr; + ar.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr; + ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID; + ar.diskFileInfo.gid = DISK_FILE_GID; + ar.fileSize = 1000; + ar.requester = { "user", "group" }; + ar.srcURL = std::string("root:/") + ar.diskFileInfo.path; + ar.storageClass = "storageClass"; + db.queueArchive("eosInstance", ar, afqc, locallc); + }); + jobInsertions.emplace_back(std::async(std::launch::async,lambdas.back())); + } + for (auto &j: jobInsertions) { j.get(); } + jobInsertions.clear(); + lambdas.clear(); + db.waitSubthreadsComplete(); + + // Then load all archive jobs into memory + // Create mount. + auto moutInfo = db.getMountInfo(lc); + cta::catalogue::TapeForWriting tfw; + tfw.tapePool = "tapePool"; + tfw.vid = "vid"; + auto am = moutInfo->createArchiveMount(common::dataStructures::MountType::ArchiveForUser, tfw, "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr)); + auto ajb = am->getNextJobBatch(filesToDo, 1000 * filesToDo, lc); + //Files with successful fetch should be popped + ASSERT_EQ(10, ajb.size()); + //failing the jobs should requeue them + for (auto &aj: ajb) { + aj->failTransfer("test", lc); + } + //Jobs in queue should have been requeued with original creationlog time + auto pendingArchiveJobs = db.getArchiveJobs(); + ASSERT_EQ(pendingArchiveJobs["tapePool"].size(), filesToDo); + for(auto &aj: pendingArchiveJobs["tapePool"]) { + ASSERT_EQ(aj.request.creationLog.time, creationTime); + } + + am->complete(time(nullptr)); + am.reset(nullptr); + moutInfo.reset(nullptr); +} + +TEST_P(SchedulerDatabaseTest, popAndRequeueRetrieveRequests) { + using namespace cta; +#ifndef STDOUT_LOGGING + cta::log::DummyLogger dl("", ""); +#else + cta::log::StdoutLogger dl("", ""); +#endif + cta::log::LogContext lc(dl); + + cta::SchedulerDatabase &db = getDb(); + //cta::catalogue::Catalogue &catalogue = getCatalogue(); + + + // Create the disk system list + cta::disk::DiskSystemList diskSystemList; + cta::disk::DiskSystem diskSystem{"ds-A", "$root://a.disk.system/", "constantFreeSpace:999999999999", 60, 10UL*1000*1000*1000, + 15*60, common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"}; + diskSystemList.push_back(diskSystem); + + // Inject 10 retrieve jobs to the db. + const size_t filesToDo = 10; + std::list<std::future<void>> jobInsertions; + std::list<std::function<void()>> lambdas; + auto creationTime = time(nullptr); + for (auto i: cta::range<size_t>(filesToDo)) { + lambdas.emplace_back( + [i,&db,&lc, creationTime, diskSystemList](){ + cta::common::dataStructures::RetrieveRequest rr; + cta::log::LogContext locallc=lc; + cta::common::dataStructures::RetrieveFileQueueCriteria rfqc; + rfqc.mountPolicy.name = "mountPolicy"; + rfqc.mountPolicy.archivePriority = 1; + rfqc.mountPolicy.archiveMinRequestAge = 0; + rfqc.mountPolicy.retrievePriority = 1; + rfqc.mountPolicy.retrieveMinRequestAge = 0; + rfqc.mountPolicy.creationLog = { "u", "h", creationTime}; + rfqc.mountPolicy.lastModificationLog = { "u", "h", creationTime}; + rfqc.mountPolicy.comment = "comment"; + rfqc.archiveFile.fileSize = 1000; + rfqc.archiveFile.tapeFiles.push_back(cta::common::dataStructures::TapeFile()); + rfqc.archiveFile.tapeFiles.back().fSeq = i; + rfqc.archiveFile.tapeFiles.back().vid = "vid"; + rr.creationLog = { "user", "host", creationTime}; + uuid_t fileUUID; + uuid_generate(fileUUID); + char fileUUIDStr[37]; + uuid_unparse(fileUUID, fileUUIDStr); + rr.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr; + rr.requester = { "user", "group" }; + std::string dsName = "ds-A"; + rr.dstURL = std::string ("root://") + "a" + ".disk.system/" + std::to_string(0); + db.queueRetrieve(rr, rfqc, dsName, locallc); + }); + jobInsertions.emplace_back(std::async(std::launch::async,lambdas.back())); + } + for (auto &j: jobInsertions) { j.get(); } + jobInsertions.clear(); + lambdas.clear(); + db.waitSubthreadsComplete(); + + // Then load all retrieve jobs into memory + // Create mount. + auto mountInfo = db.getMountInfo(lc); + ASSERT_EQ(1, mountInfo->potentialMounts.size()); + auto rm=mountInfo->createRetrieveMount("vid", "tapePool", "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr), cta::nullopt); + { + auto rjb = rm->getNextJobBatch(10,20*1000,lc); + //Files with successful fetch should be popped + ASSERT_EQ(10, rjb.size()); + //jobs retain their creation time after being popped + for (auto &rj: rjb) { + ASSERT_EQ(creationTime, rj->retrieveRequest.lifecycleTimings.creation_time); + } + //requeing and repopping the jobs does not change the creation time + rm->requeueJobBatch(rjb, lc); + rjb = rm->getNextJobBatch(10,20*1000,lc); + //Files with successful fetch should be popped + ASSERT_EQ(10, rjb.size()); + //jobs retain their creation time after being popped + for (auto &rj: rjb) { + ASSERT_EQ(creationTime, rj->retrieveRequest.lifecycleTimings.creation_time); + } + + } + rm->complete(time(nullptr)); + rm.reset(nullptr); + mountInfo.reset(nullptr); +} + + TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) { using namespace cta; #ifndef STDOUT_LOGGING