Skip to content
Snippets Groups Projects
Commit 49aaa23b authored by Eric Cano's avatar Eric Cano
Browse files

Changed the ManyArchiveJobs to multithreaded.

The test remains disabled as it still take quite a bit of time.
parent 8f3bc4c2
Branches
Tags
No related merge requests found
......@@ -130,47 +130,57 @@ const cta::common::dataStructures::SecurityIdentity SchedulerDatabaseTest::s_use
// unit test is disabled as it is pretty long to run.
TEST_P(SchedulerDatabaseTest, DISABLED_createManyArchiveJobs) {
using namespace cta;
log::DummyLogger dl("");
log::LogContext lc(dl);
cta::log::DummyLogger dl("");
cta::log::LogContext lc(dl);
SchedulerDatabase &db = getDb();
cta::SchedulerDatabase &db = getDb();
// Inject 1000 archive jobs to the db.
cta::common::dataStructures::ArchiveFileQueueCriteria afqc;
afqc.copyToPoolMap.insert({1, "tapePool"});
afqc.fileId = 0;
afqc.mountPolicy.name = "mountPolicy";
afqc.mountPolicy.archivePriority = 1;
afqc.mountPolicy.archiveMinRequestAge = 0;
afqc.mountPolicy.retrievePriority = 1;
afqc.mountPolicy.retrieveMinRequestAge = 0;
afqc.mountPolicy.maxDrivesAllowed = 10;
afqc.mountPolicy.creationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.lastModificationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.comment = "comment";
const size_t filesToDo = 100;
std::list<std::future<void>> jobInsertions;
std::list<std::function<void()>> lambdas;
for (size_t i=0; i<filesToDo; i++) {
cta::common::dataStructures::ArchiveRequest ar;
ar.archiveReportURL="";
ar.checksumType="";
ar.creationLog = { "user", "host", time(nullptr)};
uuid_t fileUUID;
uuid_generate(fileUUID);
char fileUUIDStr[37];
uuid_unparse(fileUUID, fileUUIDStr);
ar.diskFileID = fileUUIDStr;
ar.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr;
ar.diskFileInfo.owner = "user";
ar.diskFileInfo.group = "group";
// Attempt to create a no valid UTF8 string.
ar.diskFileInfo.recoveryBlob = std::string("recoveryBlob") + "\xc3\xb1";
ar.fileSize = 1000;
ar.requester = { "user", "group" };
ar.srcURL = std::string("root:/") + ar.diskFileInfo.path;
ar.storageClass = "storageClass";
afqc.fileId = i;
db.queueArchive("eosInstance", ar, afqc, lc);
lambdas.emplace_back(
[i,&db,&lc](){
cta::common::dataStructures::ArchiveRequest ar;
cta::log::LogContext locallc=lc;
cta::common::dataStructures::ArchiveFileQueueCriteria afqc;
afqc.copyToPoolMap.insert({1, "tapePool"});
afqc.fileId = 0;
afqc.mountPolicy.name = "mountPolicy";
afqc.mountPolicy.archivePriority = 1;
afqc.mountPolicy.archiveMinRequestAge = 0;
afqc.mountPolicy.retrievePriority = 1;
afqc.mountPolicy.retrieveMinRequestAge = 0;
afqc.mountPolicy.maxDrivesAllowed = 10;
afqc.mountPolicy.creationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.lastModificationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.comment = "comment";
afqc.fileId = i;
ar.archiveReportURL="";
ar.checksumType="";
ar.creationLog = { "user", "host", time(nullptr)};
uuid_t fileUUID;
uuid_generate(fileUUID);
char fileUUIDStr[37];
uuid_unparse(fileUUID, fileUUIDStr);
ar.diskFileID = fileUUIDStr;
ar.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr;
ar.diskFileInfo.owner = "user";
ar.diskFileInfo.group = "group";
// Attempt to create a no valid UTF8 string.
ar.diskFileInfo.recoveryBlob = std::string("recoveryBlob") + "\xc3\xb1";
ar.fileSize = 1000;
ar.requester = { "user", "group" };
ar.srcURL = std::string("root:/") + ar.diskFileInfo.path;
ar.storageClass = "storageClass";
db.queueArchive("eosInstance", ar, afqc, locallc);
});
jobInsertions.emplace_back(std::async(std::launch::async | std::launch::deferred,lambdas.back()));
}
for (auto &j: jobInsertions) { j.wait(); }
jobInsertions.clear();
lambdas.clear();
// Then load all archive jobs into memory
// Create mount.
......@@ -196,29 +206,49 @@ TEST_P(SchedulerDatabaseTest, DISABLED_createManyArchiveJobs) {
am.reset(nullptr);
moutInfo.reset(nullptr);
const size_t filesToDo2 = 1000;
const size_t filesToDo2 = 200;
for (size_t i=0; i<filesToDo2; i++) {
cta::common::dataStructures::ArchiveRequest ar;
ar.archiveReportURL="";
ar.checksumType="";
ar.creationLog = { "user", "host", time(nullptr)};
uuid_t fileUUID;
uuid_generate(fileUUID);
char fileUUIDStr[37];
uuid_unparse(fileUUID, fileUUIDStr);
ar.diskFileID = fileUUIDStr;
ar.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr;
ar.diskFileInfo.owner = "user";
ar.diskFileInfo.group = "group";
// Attempt to create a no valid UTF8 string.
ar.diskFileInfo.recoveryBlob = std::string("recoveryBlob") + "\xc3\xb1";
ar.fileSize = 1000;
ar.requester = { "user", "group" };
ar.srcURL = std::string("root:/") + ar.diskFileInfo.path;
ar.storageClass = "storageClass";
afqc.fileId = i;
db.queueArchive("eosInstance", ar, afqc, lc);
}
lambdas.emplace_back(
[i,&db,&lc](){
cta::common::dataStructures::ArchiveRequest ar;
cta::log::LogContext locallc=lc;
cta::common::dataStructures::ArchiveFileQueueCriteria afqc;
afqc.copyToPoolMap.insert({1, "tapePool"});
afqc.fileId = 0;
afqc.mountPolicy.name = "mountPolicy";
afqc.mountPolicy.archivePriority = 1;
afqc.mountPolicy.archiveMinRequestAge = 0;
afqc.mountPolicy.retrievePriority = 1;
afqc.mountPolicy.retrieveMinRequestAge = 0;
afqc.mountPolicy.maxDrivesAllowed = 10;
afqc.mountPolicy.creationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.lastModificationLog = { "u", "h", time(nullptr)};
afqc.mountPolicy.comment = "comment";
afqc.fileId = i;
ar.archiveReportURL="";
ar.checksumType="";
ar.creationLog = { "user", "host", time(nullptr)};
uuid_t fileUUID;
uuid_generate(fileUUID);
char fileUUIDStr[37];
uuid_unparse(fileUUID, fileUUIDStr);
ar.diskFileID = fileUUIDStr;
ar.diskFileInfo.path = std::string("/uuid/")+fileUUIDStr;
ar.diskFileInfo.owner = "user";
ar.diskFileInfo.group = "group";
// Attempt to create a no valid UTF8 string.
ar.diskFileInfo.recoveryBlob = std::string("recoveryBlob") + "\xc3\xb1";
ar.fileSize = 1000;
ar.requester = { "user", "group" };
ar.srcURL = std::string("root:/") + ar.diskFileInfo.path;
ar.storageClass = "storageClass";
db.queueArchive("eosInstance", ar, afqc, locallc);
});
jobInsertions.emplace_back(std::async(std::launch::async,lambdas.back()));
}
for (auto &j: jobInsertions) { j.wait(); }
jobInsertions.clear();
lambdas.clear();
// Then load all archive jobs into memory (2nd pass)
// Create mount.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment