Skip to content
Snippets Groups Projects
Commit 65ef7e2e authored by mvelosob's avatar mvelosob
Browse files

make requeued jobs retain their original creationTime(#1102)

parent 5c3fbb0e
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@
## Features
## Bug fixes
- cta/CTA#1102 - Make requeued jobs retain their original creation time
# v4.5.1-1
......
......@@ -39,8 +39,9 @@ void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, Inse
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
auto creationLog = ar.getCreationLog();
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(nullptr)});
e.mountPolicy, ar.getCreationLog().time});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
......
......@@ -298,7 +298,7 @@ addReferencesIfNecessaryAndCommit(Container& cont, typename InsertedElement::lis
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
for (auto &e : elemMemCont) {
RetrieveRequest &rr = *e.retrieveRequest;
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activity, e.diskSystemName});
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, rr.getCreationTime(), e.activity, e.diskSystemName});
}
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
}
......
......@@ -1033,6 +1033,11 @@ void RetrieveRequest::setCreationTime(const uint64_t creationTime){
m_payload.mutable_lifecycle_timings()->set_creation_time(creationTime);
}
uint64_t RetrieveRequest::getCreationTime() {
checkPayloadReadable();
return m_payload.lifecycle_timings().creation_time();
}
void RetrieveRequest::setFirstSelectedTime(const uint64_t firstSelectedTime){
checkPayloadWritable();
m_payload.mutable_lifecycle_timings()->set_first_selected_time(firstSelectedTime);
......
......@@ -259,6 +259,8 @@ public:
cta::common::dataStructures::EntryLog getEntryLog();
cta::common::dataStructures::LifecycleTimings getLifecycleTimings();
void setCreationTime(const uint64_t creationTime);
uint64_t getCreationTime();
void setFirstSelectedTime(const uint64_t firstSelectedTime);
void setCompletedTime(const uint64_t completedTime);
void setReportedTime(const uint64_t reportedTime);
......
......@@ -443,6 +443,179 @@ TEST_P(SchedulerDatabaseTest, createQueueAndPutToSleep) {
}
TEST_P(SchedulerDatabaseTest, popAndRequeueArchiveRequests) {
using namespace cta;
#ifndef STDOUT_LOGGING
cta::log::DummyLogger dl("", "");
#else
cta::log::StdoutLogger dl("", "");
#endif
cta::log::LogContext lc(dl);
cta::SchedulerDatabase &db = getDb();
// Inject 10 archive jobs to the db.
const size_t filesToDo = 10;
std::list<std::future<void>> jobInsertions;
std::list<std::function<void()>> lambdas;
auto creationTime = time(nullptr);
for (auto i: cta::range<size_t>(filesToDo)) {
lambdas.emplace_back(
[i,&db, creationTime, &lc](){
cta::common::dataStructures::ArchiveRequest ar;
cta::log::LogContext locallc=lc;
cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId afqc;
afqc.copyToPoolMap.insert({1, "tapePool"});
afqc.fileId = 0;
afqc.mountPolicy.name = "mountPolicy";
afqc.mountPolicy.archivePriority = 1;
afqc.mountPolicy.archiveMinRequestAge = 0;
afqc.mountPolicy.retrievePriority = 1;
afqc.mountPolicy.retrieveMinRequestAge = 0;
afqc.mountPolicy.creationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.lastModificationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.comment = "comment";
afqc.fileId = i;
ar.archiveReportURL="";
ar.checksumBlob.insert(cta::checksum::NONE, "");
ar.creationLog = { "user", "host", creationTime};
uuid_t fileUUID;
uuid_generate(fileUUID);
char fileUUIDStr[37];
uuid_unparse(fileUUID, fileUUIDStr);
ar.diskFileID = fileUUIDStr;
ar.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr;
ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID;
ar.diskFileInfo.gid = DISK_FILE_GID;
ar.fileSize = 1000;
ar.requester = { "user", "group" };
ar.srcURL = std::string("root:/") + ar.diskFileInfo.path;
ar.storageClass = "storageClass";
db.queueArchive("eosInstance", ar, afqc, locallc);
});
jobInsertions.emplace_back(std::async(std::launch::async,lambdas.back()));
}
for (auto &j: jobInsertions) { j.get(); }
jobInsertions.clear();
lambdas.clear();
db.waitSubthreadsComplete();
// Then load all archive jobs into memory
// Create mount.
auto moutInfo = db.getMountInfo(lc);
cta::catalogue::TapeForWriting tfw;
tfw.tapePool = "tapePool";
tfw.vid = "vid";
auto am = moutInfo->createArchiveMount(common::dataStructures::MountType::ArchiveForUser, tfw, "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr));
auto ajb = am->getNextJobBatch(filesToDo, 1000 * filesToDo, lc);
//Files with successful fetch should be popped
ASSERT_EQ(10, ajb.size());
//failing the jobs should requeue them
for (auto &aj: ajb) {
aj->failTransfer("test", lc);
}
//Jobs in queue should have been requeued with original creationlog time
auto pendingArchiveJobs = db.getArchiveJobs();
ASSERT_EQ(pendingArchiveJobs["tapePool"].size(), filesToDo);
for(auto &aj: pendingArchiveJobs["tapePool"]) {
ASSERT_EQ(aj.request.creationLog.time, creationTime);
}
am->complete(time(nullptr));
am.reset(nullptr);
moutInfo.reset(nullptr);
}
TEST_P(SchedulerDatabaseTest, popAndRequeueRetrieveRequests) {
using namespace cta;
#ifndef STDOUT_LOGGING
cta::log::DummyLogger dl("", "");
#else
cta::log::StdoutLogger dl("", "");
#endif
cta::log::LogContext lc(dl);
cta::SchedulerDatabase &db = getDb();
//cta::catalogue::Catalogue &catalogue = getCatalogue();
// Create the disk system list
cta::disk::DiskSystemList diskSystemList;
cta::disk::DiskSystem diskSystem{"ds-A", "$root://a.disk.system/", "constantFreeSpace:999999999999", 60, 10UL*1000*1000*1000,
15*60, common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"};
diskSystemList.push_back(diskSystem);
// Inject 10 retrieve jobs to the db.
const size_t filesToDo = 10;
std::list<std::future<void>> jobInsertions;
std::list<std::function<void()>> lambdas;
auto creationTime = time(nullptr);
for (auto i: cta::range<size_t>(filesToDo)) {
lambdas.emplace_back(
[i,&db,&lc, creationTime, diskSystemList](){
cta::common::dataStructures::RetrieveRequest rr;
cta::log::LogContext locallc=lc;
cta::common::dataStructures::RetrieveFileQueueCriteria rfqc;
rfqc.mountPolicy.name = "mountPolicy";
rfqc.mountPolicy.archivePriority = 1;
rfqc.mountPolicy.archiveMinRequestAge = 0;
rfqc.mountPolicy.retrievePriority = 1;
rfqc.mountPolicy.retrieveMinRequestAge = 0;
rfqc.mountPolicy.creationLog = { "u", "h", creationTime};
rfqc.mountPolicy.lastModificationLog = { "u", "h", creationTime};
rfqc.mountPolicy.comment = "comment";
rfqc.archiveFile.fileSize = 1000;
rfqc.archiveFile.tapeFiles.push_back(cta::common::dataStructures::TapeFile());
rfqc.archiveFile.tapeFiles.back().fSeq = i;
rfqc.archiveFile.tapeFiles.back().vid = "vid";
rr.creationLog = { "user", "host", creationTime};
uuid_t fileUUID;
uuid_generate(fileUUID);
char fileUUIDStr[37];
uuid_unparse(fileUUID, fileUUIDStr);
rr.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr;
rr.requester = { "user", "group" };
std::string dsName = "ds-A";
rr.dstURL = std::string ("root://") + "a" + ".disk.system/" + std::to_string(0);
db.queueRetrieve(rr, rfqc, dsName, locallc);
});
jobInsertions.emplace_back(std::async(std::launch::async,lambdas.back()));
}
for (auto &j: jobInsertions) { j.get(); }
jobInsertions.clear();
lambdas.clear();
db.waitSubthreadsComplete();
// Then load all retrieve jobs into memory
// Create mount.
auto mountInfo = db.getMountInfo(lc);
ASSERT_EQ(1, mountInfo->potentialMounts.size());
auto rm=mountInfo->createRetrieveMount("vid", "tapePool", "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr), cta::nullopt);
{
auto rjb = rm->getNextJobBatch(10,20*1000,lc);
//Files with successful fetch should be popped
ASSERT_EQ(10, rjb.size());
//jobs retain their creation time after being popped
for (auto &rj: rjb) {
ASSERT_EQ(creationTime, rj->retrieveRequest.lifecycleTimings.creation_time);
}
//requeing and repopping the jobs does not change the creation time
rm->requeueJobBatch(rjb, lc);
rjb = rm->getNextJobBatch(10,20*1000,lc);
//Files with successful fetch should be popped
ASSERT_EQ(10, rjb.size());
//jobs retain their creation time after being popped
for (auto &rj: rjb) {
ASSERT_EQ(creationTime, rj->retrieveRequest.lifecycleTimings.creation_time);
}
}
rm->complete(time(nullptr));
rm.reset(nullptr);
mountInfo.reset(nullptr);
}
TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
using namespace cta;
#ifndef STDOUT_LOGGING
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment