Commit fccc6c2d authored by Eric Cano's avatar Eric Cano
Browse files

#450: Implemented the queueing of transformed subrequests.

Updated the unit tests to validate the queues and requests are as expected.
parent 81b9c125
......@@ -1025,7 +1025,7 @@ RetrieveRequest::AsyncRetrieveToArchiveTransformer * RetrieveRequest::asyncTrans
auto maxReportRetries = retrieveRequestPayload.jobs(0).maxreportretries();
for(auto cntr: repackInfoSerDeser.copyNbsToRearchive) {
auto *archiveJob = archiveRequestPayload.add_jobs();
archiveJob->set_status(cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser);
archiveJob->set_status(cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForRepack);
archiveJob->set_copynb(cntr);
archiveJob->set_archivequeueaddress("");
archiveJob->set_totalreportretries(0);
......
......@@ -1608,6 +1608,11 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
// 2) We should async transform the retrieve requests into archive requests.
// From this point on, failing to transform is counted as a failure to archive.
struct SuccessfullyTranformedRequest {
std::shared_ptr<objectstore::ArchiveRequest> archiveRequest;
SubrequestInfo & subrequestInfo;
};
std::list<SuccessfullyTranformedRequest> successfullyTransformedSubrequests;
{
objectstore::RepackRequest::SubrequestStatistics::List failedArchiveSSL;
std::list<SubrequestInfo *> failedSubrequests;
......@@ -1656,6 +1661,12 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
params.add("fileId", atar.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", atar.subrequestInfo.subrequest->getAddressIfSet());
lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(), turned successful retrieve request in archive request.");
successfullyTransformedSubrequests.push_back(SuccessfullyTranformedRequest{
std::make_shared<objectstore::ArchiveRequest>(
atar.subrequestInfo.subrequest->getAddressIfSet(),
m_oStoreDb.m_objectStore),
atar.subrequestInfo
});
} catch (exception::Exception & ex) {
// We failed to archive the file (to create the request, in fact). So all the copyNbs
// can be counted as failed.
......@@ -1741,7 +1752,18 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
}
// 3. We now just need to queue the freshly created archive jobs into their respective queues
// XXX: TODO
{
objectstore::Sorter sorter(*m_oStoreDb.m_agentReference, m_oStoreDb.m_objectStore, m_oStoreDb.m_catalogue);
std::list<std::unique_ptr<objectstore::ScopedExclusiveLock>> locks;
// TODO: swich to "lockfree" sorter interface.
for (auto &sts: successfullyTransformedSubrequests) {
locks.push_back(cta::make_unique<objectstore::ScopedExclusiveLock>(*sts.archiveRequest));
sts.archiveRequest->fetch();
sorter.insertArchiveRequest(sts.archiveRequest, *m_oStoreDb.m_agentReference, lc);
}
locks.clear();
sorter.flushAll(lc);
}
}
//------------------------------------------------------------------------------
......
......@@ -41,6 +41,7 @@
#include "common/Timer.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "objectstore/Algorithms.hpp"
#include "common/range.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
......@@ -1607,102 +1608,49 @@ TEST_P(SchedulerTest, expandRepackRequest) {
}
}
}
archiveFileId = 1;
std::vector<std::string> retrieveRequestsAddresses;
for(uint64_t i = 1; i<= nbTapesForTest ;++i)
// Now get the reports from the DB a check they are in the right queues
{
//We will now test the getNextRetrieveRequestToReportToRepackForSuccessBatch method that
//pop all the RetrieveRequest from the RetrieveQueueToReportToRepackForSuccess queue
auto listSucceededRetrieveRequests = schedulerDB.getNextRetrieveRequestToReportToRepackForSuccessBatch(nbArchiveFilesPerTape,lc);
{
ASSERT_EQ(listSucceededRetrieveRequests.size(),nbArchiveFilesPerTape);
int j = 1;
for (auto &retrieveRequest: listSucceededRetrieveRequests)
{
//Lock and fetch the RetrieveRequest
objectstore::ScopedExclusiveLock rReqL(*retrieveRequest);
retrieveRequest->fetch();
//initialize variable for the test
uint32_t copyNb = retrieveRequest->getActiveCopyNumber();
common::dataStructures::ArchiveFile archiveFile = retrieveRequest->getArchiveFile();
common::dataStructures::TapeFile tapeFile = archiveFile.tapeFiles[copyNb];
std::string currentVid = allVid.at(i-1);
//Testing RetrieveRequest
common::dataStructures::RetrieveRequest schedulerRetrieveRequest = retrieveRequest->getSchedulerRequest();
// TODO ASSERT_EQ(retrieveRequest->getTapePool(),s_tapePoolName);
ASSERT_EQ(retrieveRequest->getJobs().size(),1);
ASSERT_EQ(retrieveRequest->getLastActiveVid(),currentVid);
ASSERT_EQ(retrieveRequest->getQueueType(),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess);
//Testing tape file
ASSERT_EQ(tapeFile.vid,currentVid);
ASSERT_EQ(tapeFile.blockId,j * 100);
ASSERT_EQ(tapeFile.fSeq,j);
ASSERT_EQ(tapeFile.checksumType, checksumType);
ASSERT_EQ(tapeFile.checksumValue,checksumValue);
ASSERT_EQ(tapeFile.compressedSize, compressedFileSize);
//Testing scheduler retrieve request
ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId++);
std::stringstream ss;
ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j;
ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str());
// TODO ASSERT_EQ(schedulerRetrieveRequest.isRepack,true);
// TODO ASSERT_EQ(schedulerRetrieveRequest.tapePool,s_tapePoolName);
std::ostringstream diskFilePath;
diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j;
ASSERT_EQ(schedulerRetrieveRequest.diskFileInfo.path,diskFilePath.str());
//Testing the retrieve request
// TODO ASSERT_EQ(schedulerRetrieveRequest.isRepack,true);
//Testing the archive file associated to the retrieve request
ASSERT_EQ(archiveFile.storageClass,storageClass.name);
ASSERT_EQ(archiveFile.diskInstance,storageClass.diskInstance);
++j;
}
while (true) {
auto rep = schedulerDB.getNextRepackReportBatch(lc);
if (nullptr == rep) break;
rep->report(lc);
}
//Now, we will transform the RetrieveRequests into ArchiveRequest
{
//Now, we will transform the RetrieveRequests into ArchiveRequest
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest::AsyncRetrieveToArchiveTransformer>> transformers;
for (auto &retrieveRequest: listSucceededRetrieveRequests) {
retrieveRequestsAddresses.push_back(retrieveRequest->getAddressIfSet());
transformers.emplace_back(retrieveRequest->asyncTransformToArchiveRequest(*agentReference));
}
//Wait for all the asynchronous transformations
for (auto &transformer: transformers) {
transformer->wait();
}
// All the retrieve requests should be gone and replaced by archive requests.
cta::objectstore::RootEntry re(schedulerDB.getBackend());
re.fetchNoLock();
typedef cta::objectstore::JobQueueType QueueType;
for (auto queueType: {QueueType::FailedJobs, QueueType::JobsToReportToRepackForFailure, QueueType::JobsToReportToRepackForSuccess,
QueueType::JobsToReportToUser, QueueType::JobsToTransferForRepack, QueueType::JobsToTransferForUser}) {
ASSERT_EQ(0, re.dumpRetrieveQueues(queueType).size());
}
}
//Testing that the RetrieveQueueToReportToRepackForSuccess is empty
ASSERT_EQ(schedulerDB.getNextRetrieveRequestToReportToRepackForSuccessBatch(nbArchiveFilesPerTape,lc).size(),0);
//Testing the new ArchiveRequests contains the same data as the previous RetrieveRequest
archiveFileId = 1;
int retrieveRequestAddressIndex = 0;
std::map<uint64_t,std::list<cta::objectstore::ArchiveRequest>> archiveRequestsPerTape;
for(uint64_t i = 1; i<= nbTapesForTest ;++i)
{
for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j)
{
cta::objectstore::ArchiveRequest archiveRequest(retrieveRequestsAddresses.at(retrieveRequestAddressIndex),schedulerDB.getBackend());
archiveRequestsPerTape[i].emplace_back(archiveRequest);
cta::objectstore::ScopedExclusiveLock arLock(archiveRequest);
archiveRequest.fetch();
ASSERT_EQ(1, re.dumpArchiveQueues(QueueType::JobsToTransferForRepack).size());
for (auto queueType: {QueueType::FailedJobs, QueueType::JobsToReportToRepackForFailure, QueueType::JobsToReportToRepackForSuccess,
QueueType::JobsToReportToUser, QueueType::JobsToTransferForUser}) {
ASSERT_EQ(0, re.dumpArchiveQueues(queueType).size());
}
// Now check we find all our requests in the archive queue.
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(s_tapePoolName, cta::objectstore::JobQueueType::JobsToTransferForRepack),
schedulerDB.getBackend());
aq.fetchNoLock();
std::set<uint64_t> archiveIdsSeen;
for (auto aqj: aq.dumpJobs()) {
cta::objectstore::ArchiveRequest ar(aqj.address, schedulerDB.getBackend());
ar.fetchNoLock();
common::dataStructures::ArchiveFile archiveFile = ar.getArchiveFile();
// ArchiveFileId are 1-10 for tape 1 and 11-20 for tape 2.
uint64_t tapeIndex = (archiveFile.archiveFileID - 1) / nbArchiveFilesPerTape + 1;
uint64_t fileIndex = (archiveFile.archiveFileID - 1) % nbArchiveFilesPerTape + 1;
ASSERT_LE(1, tapeIndex);
ASSERT_GE(nbTapesForTest, tapeIndex);
ASSERT_LE(1, fileIndex);
ASSERT_GE(nbArchiveFilesPerTape, fileIndex);
//Test the ArchiveRequest
common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile();
ASSERT_EQ(archiveFile.archiveFileID,archiveFileId);
ASSERT_EQ(archiveFile.checksumType,checksumType);
ASSERT_EQ(archiveFile.checksumValue,checksumValue);
std::ostringstream diskFilePath;
diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j;
diskFilePath << "/public_dir/public_file_"<<tapeIndex<<"_"<<fileIndex;
std::ostringstream diskFileId;
diskFileId << (12345677 + archiveFileId);
diskFileId << (12345677 + archiveFile.archiveFileID);
ASSERT_EQ(archiveFile.diskFileId,diskFileId.str());
ASSERT_EQ(archiveFile.diskFileInfo.path,diskFilePath.str());
ASSERT_EQ(archiveFile.diskFileInfo.group,diskFileGroup);
......@@ -1711,35 +1659,20 @@ TEST_P(SchedulerTest, expandRepackRequest) {
ASSERT_EQ(archiveFile.fileSize,archiveFileSize);
ASSERT_EQ(archiveFile.storageClass,s_storageClassName);
std::stringstream ss;
ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j;
ASSERT_EQ(archiveRequest.getSrcURL(),ss.str());
for(auto archiveJob : archiveRequest.dumpJobs()){
ASSERT_EQ(archiveJob.status,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser);
ASSERT_TRUE(archiveJob.owner.find(agentReferenceName) != std::string::npos);
ss<<"root://repackData/buffer/"<<allVid.at(tapeIndex-1)<<"/"<<std::setw(9)<<std::setfill('0')<<fileIndex;
ASSERT_EQ(ar.getSrcURL(),ss.str());
for(auto archiveJob : ar.dumpJobs()){
ASSERT_EQ(archiveJob.status,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForRepack);
ASSERT_EQ(aq.getAddressIfSet(), archiveJob.owner);
}
++retrieveRequestAddressIndex;
++archiveFileId;
archiveIdsSeen.insert(archiveFile.archiveFileID);
}
// Validate we got all the files we expected.
ASSERT_EQ(20, archiveIdsSeen.size());
for (auto ai: cta::range<uint64_t>(1,20)) {
ASSERT_EQ(1, archiveIdsSeen.count(ai));
}
}
//queue all the ArchiveRequests into the ArchiveQueueToTransferForRepack queue.
for(uint64_t i = 1; i <= nbTapesForTest; ++i){
scheduler.queueArchiveRequestForRepackBatch(archiveRequestsPerTape[i],lc);
}
scheduler.waitSchedulerDbSubthreadsComplete();
//Test that the ArchiveRequests are in the ArchiveQueueToTransferForRepack queue
/*cta::objectstore::RootEntry re(schedulerDB.getBackend());
cta::objectstore::ScopedExclusiveLock sel(re);
re.fetch();
//Get the retrieveQueueToReportToRepackForSuccess
std::string archiveQueueToTransferForRepack = re.getArchiveQueueAddress(s_tapePoolName,cta::objectstore::JobQueueType::JobsToTransferForRepack);
cta::objectstore::ArchiveQueue aq(archiveQueueToTransferForRepack,schedulerDB.getBackend());
//Fetch the queue so that we can get the retrieveRequests from it
cta::objectstore::ScopedExclusiveLock rql(aq);
aq.fetch();*/
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment