Commit 6a6374e7 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Transformed the RetrieveRequests from Repack to ArchiveRequests at ObjectStore level + unit tests

Removed the "User" message from the cta.proto (duplication of UserIdentity message)
parent 4f3a1fee
......@@ -49,7 +49,7 @@ struct RetrieveRequest {
DiskFileInfo diskFileInfo;
EntryLog creationLog;
bool isRepack;
std::string tapePool;
}; // struct RetrieveRequest
std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj);
......
......@@ -22,4 +22,4 @@ namespace cta { namespace objectstore {
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess>::c_containerTypeName = "RetrieveQueueToReportToRepackForSuccess";
}
}
}
\ No newline at end of file
......@@ -58,6 +58,7 @@ void RetrieveRequest::initialize() {
m_payload.set_failurereportlog("");
m_payload.set_failurereporturl("");
m_payload.set_isrepack(false);
m_payload.set_tapepool("");
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
......@@ -401,6 +402,7 @@ cta::common::dataStructures::RetrieveRequest RetrieveRequest::getSchedulerReques
el.deserialize(m_payload.schedulerrequest().entrylog());
ret.dstURL = m_payload.schedulerrequest().dsturl();
ret.isRepack = m_payload.isrepack();
ret.tapePool = m_payload.tapepool();
ret.errorReportURL = m_payload.schedulerrequest().retrieveerrorreporturl();
objectstore::DiskFileInfoSerDeser dfisd;
dfisd.deserialize(m_payload.schedulerrequest().diskfileinfo());
......@@ -713,6 +715,7 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint16_t copyNumber, const std::string
retRef.m_retrieveRequest.diskFileInfo = dfi;
retRef.m_retrieveRequest.dstURL = payload.schedulerrequest().dsturl();
retRef.m_retrieveRequest.isRepack = payload.isrepack();
retRef.m_retrieveRequest.tapePool = payload.tapepool();
retRef.m_retrieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl();
retRef.m_retrieveRequest.requester.name = payload.schedulerrequest().requester().name();
retRef.m_retrieveRequest.requester.group = payload.schedulerrequest().requester().group();
......@@ -822,7 +825,7 @@ void RetrieveRequest::AsyncJobDeleter::wait() {
}
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncJobSucceedForRepackReporter::asyncReportSucceedForRepack()
// RetrieveRequest::asyncReportSucceedForRepack()
//------------------------------------------------------------------------------
RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReportSucceedForRepack(uint64_t copyNb)
{
......@@ -873,6 +876,107 @@ void RetrieveRequest::AsyncJobSucceedForRepackReporter::wait(){
m_backendUpdater->wait();
}
//------------------------------------------------------------------------------
// RetrieveRequest::asyncTransformToArchiveRequest()
//------------------------------------------------------------------------------
RetrieveRequest::AsyncRetrieveToArchiveTransformer * RetrieveRequest::asyncTransformToArchiveRequest(AgentReference& processAgent){
std::unique_ptr<AsyncRetrieveToArchiveTransformer> ret(new AsyncRetrieveToArchiveTransformer);
std::string processAgentAddress = processAgent.getAgentAddress();
ret->m_updaterCallback = [processAgentAddress](const std::string &in)->std::string{
// We have a locked and fetched object, so we just need to work on its representation.
cta::objectstore::serializers::ObjectHeader oh;
if (!oh.ParseFromString(in)) {
// Use a the tolerant parser to assess the situation.
oh.ParsePartialFromString(in);
throw cta::exception::Exception(std::string("In RetrieveRequest::asyncTransformToArchiveRequest(): could not parse header: ")+
oh.InitializationErrorString());
}
if (oh.type() != serializers::ObjectType::RetrieveRequest_t) {
std::stringstream err;
err << "In RetrieveRequest::asyncTransformToArchiveRequest()::lambda(): wrong object type: " << oh.type();
throw cta::exception::Exception(err.str());
}
serializers::RetrieveRequest retrieveRequestPayload;
if (!retrieveRequestPayload.ParseFromString(oh.payload())) {
// Use a the tolerant parser to assess the situation.
retrieveRequestPayload.ParsePartialFromString(oh.payload());
throw cta::exception::Exception(std::string("In RetrieveRequest::asyncTransformToArchiveRequest(): could not parse payload: ")+
retrieveRequestPayload.InitializationErrorString());
}
// Create the archive request from the RetrieveRequest
serializers::ArchiveRequest archiveRequestPayload;
const cta::objectstore::serializers::ArchiveFile& archiveFile = retrieveRequestPayload.archivefile();
archiveRequestPayload.set_archivefileid(archiveFile.archivefileid());
archiveRequestPayload.set_checksumtype(archiveFile.checksumtype());
archiveRequestPayload.set_checksumvalue(archiveFile.checksumvalue());
archiveRequestPayload.set_creationtime(archiveFile.creationtime()); //TODO : should the creation time be the same as the archiveFile creation time ?
archiveRequestPayload.set_diskfileid(archiveFile.diskfileid());
archiveRequestPayload.set_diskinstance(archiveFile.diskinstance());
archiveRequestPayload.set_filesize(archiveFile.filesize());
archiveRequestPayload.set_reconcilationtime(archiveFile.reconciliationtime());
archiveRequestPayload.set_storageclass(archiveFile.storageclass());
archiveRequestPayload.set_archiveerrorreporturl("");//No archive error report URL
archiveRequestPayload.set_archivereporturl("");//No archive report URL
archiveRequestPayload.set_reportdecided(false);//TODO : should we put it as false ?
// Copy disk file informations into the new ArchiveRequest
cta::objectstore::serializers::DiskFileInfo *archiveRequestDFI = archiveRequestPayload.mutable_diskfileinfo();
archiveRequestDFI->CopyFrom(archiveFile.diskfileinfo());
//ArchiveRequest source url is the same as the retrieveRequest destination URL
const cta::objectstore::serializers::SchedulerRetrieveRequest schedulerRetrieveRequest = retrieveRequestPayload.schedulerrequest();
archiveRequestPayload.set_srcurl(schedulerRetrieveRequest.dsturl());
cta::objectstore::serializers::UserIdentity *archiveRequestUser = archiveRequestPayload.mutable_requester();
archiveRequestUser->CopyFrom(schedulerRetrieveRequest.requester());
//Copy the RetrieveRequest MountPolicy into the new ArchiveRequest
cta::objectstore::serializers::MountPolicy *archiveRequestMP = archiveRequestPayload.mutable_mountpolicy();
const cta::objectstore::serializers::MountPolicy& retrieveRequestMP = retrieveRequestPayload.mountpolicy();
archiveRequestMP->CopyFrom(retrieveRequestMP);
//TODO : Should creation log just be initialized or should it be copied from the retrieveRequest ?
cta::objectstore::serializers::EntryLog *archiveRequestCL = archiveRequestPayload.mutable_creationlog();
archiveRequestCL->CopyFrom(retrieveRequestMP.creationlog());
//Add the jobs of the old RetrieveRequest to the new ArchiveRequest
for(auto retrieveJob: retrieveRequestPayload.jobs()){
auto *archiveJob = archiveRequestPayload.add_jobs();
archiveJob->set_status(cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransfer);
archiveJob->set_copynb(retrieveJob.copynb());
archiveJob->set_archivequeueaddress("");
archiveJob->set_totalreportretries(0);
archiveJob->set_lastmountwithfailure(0);
archiveJob->set_totalretries(0);
archiveJob->set_retrieswithinmount(0);
archiveJob->set_maxretrieswithinmount(retrieveJob.maxretrieswithinmount()); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_totalreportretries(0);
archiveJob->set_maxtotalretries(retrieveJob.maxtotalretries()); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_maxreportretries(retrieveJob.maxreportretries()); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_tapepool(retrieveRequestPayload.tapepool());
archiveJob->set_owner(processAgentAddress);
}
//Serialize the new ArchiveRequest so that it replaces the RetrieveRequest
oh.set_payload(archiveRequestPayload.SerializeAsString());
//Change the type of the RetrieveRequest to ArchiveRequest
oh.set_type(serializers::ObjectType::ArchiveRequest_t);
//the new ArchiveRequest is now owned by the old RetrieveRequest owner (The Repack Request)
oh.set_owner(oh.owner());
return oh.SerializeAsString();
};
ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(),ret->m_updaterCallback));
return ret.release();
}
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncRetrieveToArchiveTransformer::wait()
//------------------------------------------------------------------------------
void RetrieveRequest::AsyncRetrieveToArchiveTransformer::wait(){
m_backendUpdater->wait();
}
//------------------------------------------------------------------------------
// RetrieveRequest::getFailures()
//------------------------------------------------------------------------------
......@@ -919,4 +1023,14 @@ void RetrieveRequest::setIsRepack(bool isRepack){
m_payload.set_isrepack(isRepack);
}
std::string RetrieveRequest::getTapePool(){
checkPayloadReadable();
return m_payload.tapepool();
}
void RetrieveRequest::setTapePool(const std::string tapePool)
{
checkPayloadWritable();
m_payload.set_tapepool(tapePool);
}
}} // namespace cta::objectstore
......@@ -30,6 +30,7 @@
#include "common/dataStructures/ArchiveFile.hpp"
#include "common/dataStructures/RetrieveRequest.hpp"
#include "common/dataStructures/RetrieveFileQueueCriteria.hpp"
#include "AgentReference.hpp"
namespace cta {
namespace objectstore {
......@@ -80,16 +81,38 @@ public:
std::function<std::string(const std::string &)> m_updaterCallback;
};
/**
* This class allows to hold the asynchronous updater and the callback
* that will be executed for the transformation of a RetrieveRequest into an ArchiveRequest
*/
class AsyncRetrieveToArchiveTransformer{
friend class RetrieveRequest;
public:
void wait();
private:
//Hold the AsyncUpdater that will run asynchronously the m_updaterCallback
std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater;
//Callback to be executed by the AsyncUpdater
std::function<std::string(const std::string &)> m_updaterCallback;
};
/**
* Asynchronously report the RetrieveJob corresponding to the copyNb parameter
* as RJS_Success
* @param copyNb the copyNb corresponding to the RetrieveJob we want to report as
* RJS_Succeeded
* @return the class that is Reponsible to save the updater callback
* @return the class that is Responsible to save the updater callback
* and the backend async updater (responsible for executing asynchronously the updater callback
*/
AsyncJobSucceedForRepackReporter * asyncReportSucceedForRepack(uint64_t copyNb);
/**
* Asynchronously transform the current RetrieveRequest into an ArchiveRequest
* @param processAgent : The agent of the process that will transform the RetrieveRequest into an ArchiveRequest
* @return the class that is Responsible to save the updater callback and the backend async updater.
*/
AsyncRetrieveToArchiveTransformer * asyncTransformToArchiveRequest(AgentReference& processAgent);
JobDump getJob(uint16_t copyNb);
std::list<JobDump> getJobs();
bool addJobFailure(uint16_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc);
......@@ -122,6 +145,8 @@ public:
};
bool isRepack();
void setIsRepack(bool isRepack);
std::string getTapePool();
void setTapePool(const std::string tapePool);
private:
/*!
......
......@@ -248,11 +248,6 @@ message SchedulerGlobalLock {
required uint64 nextmountid = 8000;
}
message User {
required string name = 8800;
required string group = 8810;
}
message EntryLog {
required string username = 8950;
required string host = 8960;
......@@ -316,7 +311,7 @@ message ArchiveRequest {
required string archivereporturl = 9057;
required string archiveerrorreporturl = 9058;
required uint64 filesize = 9060;
required User requester = 9070;
required UserIdentity requester = 9070;
required string srcurl = 9080;
required string storageclass = 9090;
required EntryLog creationlog = 9091;
......@@ -372,6 +367,7 @@ message RetrieveRequest {
required string failurereporturl = 9155;
required string failurereportlog = 9156;
required bool isrepack = 9157; //In protobuf, default values for bool is false
optional string tapepool = 9158;
}
message ValueCountPair {
......
......@@ -843,6 +843,7 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
rReq->initialize();
rReq->setSchedulerRequest(rqst);
rReq->setRetrieveFileQueueCriteria(criteria);
rReq->setTapePool(rqst.tapePool);
// Find the job corresponding to the vid (and check we indeed have one).
auto jobs = rReq->getJobs();
objectstore::RetrieveRequest::JobDump job;
......@@ -869,6 +870,7 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
// "Select" an arbitrary copy number. This is needed to serialize the object.
rReq->setActiveCopyNumber(criteria.archiveFile.tapeFiles.begin()->second.copyNb);
rReq->setIsRepack(rqst.isRepack);
rReq->setTapePool(rqst.tapePool);
rReq->insert();
double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
m_taskQueueSize++;
......@@ -3155,9 +3157,9 @@ void OStoreDB::RetrieveJob::checkReportSucceedForRepack(){
m_jobSucceedForRepackReporter->wait();
}
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> OStoreDB::getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc)
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> OStoreDB::getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc)
{
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> ret;
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> ret;
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> Carqtrtrfs;
Carqtrtrfs algo(this->m_objectStore, *m_agentReference);
// Decide from which queue we are going to pop.
......@@ -3174,12 +3176,8 @@ std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> OStoreDB::getNex
if(jobs.elements.empty()) continue;
for(auto &j : jobs.elements)
{
std::unique_ptr<OStoreDB::RetrieveJob> rj(new OStoreDB::RetrieveJob(j.retrieveRequest->getAddressIfSet(), *this, nullptr));
rj->archiveFile = j.archiveFile;
rj->retrieveRequest = j.rr;
rj->selectedCopyNb = j.copyNb;
rj->setJobOwned();
ret.emplace_back(std::move(rj));
//TODO : If the retrieve request has more than one job, it will be inserted. Should we filter it ?
ret.emplace_back(std::move(j.retrieveRequest));
}
return ret;
}
......
......@@ -309,7 +309,13 @@ public:
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) override;
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) override;
/**
* Return the list of all RetrieveRequests that are in the RetrieveQueueToReportToRepackForSuccess
* @param filesRequested : The number of files we would like to return
* @param lc
* @return The list of all RetrieveRequests that are queued in the RetrieveQueueToReportToRepackForSuccess
*/
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) override;
/* === Repack requests handling =========================================== */
void queueRepack(const std::string& vid, const std::string& bufferURL,
......
......@@ -146,7 +146,7 @@ public:
return m_OStoreDB.getNextRetrieveJobsFailedBatch(filesRequested, lc);
}
std::list<std::unique_ptr<RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) override {
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) override {
return m_OStoreDB.getNextSucceededRetrieveRequestForRepackBatch(filesRequested,lc);
}
......
......@@ -210,6 +210,7 @@ getQueueJobs(const jobQueue_t &jobQueueChunk)
job.tapeCopies[tf.second.vid].second = tf.second;
job.request.dstURL = osrr.first.getSchedulerRequest().dstURL;
job.request.archiveFileID = osrr.first.getArchiveFile().archiveFileID;
job.request.tapePool = osrr.first.getTapePool();
job.fileSize = osrr.first.getArchiveFile().fileSize;
m_jobCache.push_back(job);
......
......@@ -410,6 +410,16 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
uint64_t fseq = c_defaultFseqForRepack;
std::list<common::dataStructures::ArchiveFile> files;
auto vid = repackRequest->getRepackInfo().vid;
//We need to get the ArchiveRoutes to allow the retrieval of the tapePool in which the
//tape where the file is is located
std::list<common::dataStructures::ArchiveRoute> routes = m_catalogue.getArchiveRoutes();
//To identify the routes, we need to have both the dist instance name and the storage class name
//thus, the key of the map is a pair of string
std::map<std::pair<std::string, std::string>,common::dataStructures::ArchiveRoute> mapRoutes;
for(auto route: routes){
//insert the route into the map to allow a quick retrieval
mapRoutes[std::make_pair(route.storageClassName,route.diskInstanceName)] = route;
}
while(true) {
files = m_catalogue.getFilesForRepack(vid,fseq,c_defaultMaxNbFilesForRepack);
for(auto &archiveFile : files)
......@@ -419,6 +429,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
retrieveRequest.diskFileInfo = archiveFile.diskFileInfo;
retrieveRequest.dstURL = generateRetrieveDstURL(archiveFile.diskFileInfo);
retrieveRequest.isRepack = true;
retrieveRequest.tapePool = mapRoutes[std::make_pair(archiveFile.storageClass,archiveFile.diskInstance)].tapePoolName;
queueRetrieve(archiveFile.diskInstance,retrieveRequest,lc);
}
if (files.size()) {
......@@ -1246,21 +1257,6 @@ getNextFailedRetrieveJobsBatch(uint64_t filesRequested, log::LogContext &logCont
return ret;
}
//------------------------------------------------------------------------------
// getNextSucceededRetrieveRequestForRepackBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<RetrieveJob>> Scheduler::getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc)
{
std::list<std::unique_ptr<RetrieveJob>> ret;
//We need to go through the queues of SucceededRetrieveJobs
auto dbRet = m_db.getNextSucceededRetrieveRequestForRepackBatch(filesRequested,lc);
for(auto &j : dbRet)
{
ret.emplace_back(new RetrieveJob(nullptr, j->retrieveRequest,j->archiveFile, j->selectedCopyNb, PositioningMethod::ByFSeq));
ret.back()->m_dbJob.reset(j.release());
}
return ret;
}
//------------------------------------------------------------------------------
// reportArchiveJobsBatch
//------------------------------------------------------------------------------
......
......@@ -198,14 +198,6 @@ public:
std::list<cta::common::dataStructures::RepackInfo> getRepacks();
cta::common::dataStructures::RepackInfo getRepack(const std::string &vid);
/**
* Return the list of all RetrieveRequests that are in the RetrieveQueueToReportToRepackForSuccess
* @param nbRequests : The number of request we would like to return
* @param lc
* @return The list of all RetrieveRequests that are queued in the RetrieveQueueToReportToRepackForSuccess
*/
std::list<std::unique_ptr<RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t nbRequests, log::LogContext& lc);
void shrink(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &tapepool);
// removes extra tape copies from a specific pool(usually an "_2" pool)
......
......@@ -74,12 +74,13 @@ class TapeMount;
class TapeSession;
class UserIdentity;
class RepackRequest;
namespace objectstore{
class RetrieveRequest;
}
} // cta
namespace cta {
class ArchiveRequest;
/**
* Abstract class defining the interface to the database of a tape resource
* scheduler.
......@@ -433,7 +434,13 @@ public:
*/
virtual std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logContext) = 0;
virtual std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) = 0;
virtual std::list<std::unique_ptr<RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& logContext) = 0;
/**
* Return the list of all RetrieveRequests that are in the RetrieveQueueToReportToRepackForSuccess
* @param filesRequested : The number of files we would like to return
* @param lc
* @return The list of all RetrieveRequests that are queued in the RetrieveQueueToReportToRepackForSuccess
*/
virtual std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& logContext) = 0;
/*============ Label management: user side =================================*/
// TODO
......
......@@ -1091,11 +1091,17 @@ TEST_P(SchedulerTest, expandRepackRequest) {
auto &schedulerDB = getSchedulerDB();
setupDefaultCatalogue();
//cta::log::StdoutLogger dummyLogger("dummy","dummy");
cta::log::DummyLogger dummyLogger("dummy","dummy");
log::LogContext lc(dummyLogger);
//Create an agent to represent this test process
std::string agentReferenceName = "expandRepackRequestTest";
std::unique_ptr<objectstore::AgentReference> agentReference(new objectstore::AgentReference(agentReferenceName, dummyLogger));
const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
const bool disabledValue = false;
const bool fullValue = false;
......@@ -1103,6 +1109,9 @@ TEST_P(SchedulerTest, expandRepackRequest) {
cta::common::dataStructures::SecurityIdentity admin;
admin.username = "admin_user_name";
admin.host = "admin_host";
const std::string diskFileUser = "public_disk_user";
const std::string diskFileGroup = "public_disk_group";
const std::string diskFileRecoveryBlob = "opaque_disk_file_recovery_contents";
//Create a logical library in the catalogue
catalogue.createLogicalLibrary(admin, s_libraryName, "Create logical library");
......@@ -1124,16 +1133,15 @@ TEST_P(SchedulerTest, expandRepackRequest) {
//Create a storage class in the catalogue
common::dataStructures::StorageClass storageClass;
storageClass.diskInstance = "disk_instance";
storageClass.name = "storage_class";
storageClass.diskInstance = s_diskInstance;
storageClass.name = s_storageClassName;
storageClass.nbCopies = 2;
storageClass.comment = "Create storage class";
catalogue.createStorageClass(admin, storageClass);
const std::string checksumType = "checksum_type";
const std::string checksumValue = "checksum_value";
const std::string tapeDrive = "tape_drive";
const uint64_t nbArchiveFiles = 10;
const uint64_t nbArchiveFilesPerTape = 10;
const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
const uint64_t compressedFileSize = archiveFileSize;
......@@ -1143,7 +1151,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
uint64_t archiveFileId = 1;
for(uint64_t i = 1; i<= nbTapesToRepack;++i){
std::string currentVid = allVid.at(i-1);
for(uint64_t j = 1; j <= nbArchiveFiles; ++j) {
for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) {
std::ostringstream diskFileId;
diskFileId << (12345677 + archiveFileId);
std::ostringstream diskFilePath;
......@@ -1154,13 +1162,13 @@ TEST_P(SchedulerTest, expandRepackRequest) {
fileWritten.diskInstance = storageClass.diskInstance;
fileWritten.diskFileId = diskFileId.str();
fileWritten.diskFilePath = diskFilePath.str();
fileWritten.diskFileUser = "public_disk_user";
fileWritten.diskFileGroup = "public_disk_group";
fileWritten.diskFileRecoveryBlob = "opaque_disk_file_recovery_contents";
fileWritten.diskFileUser = diskFileUser;
fileWritten.diskFileGroup = diskFileGroup;
fileWritten.diskFileRecoveryBlob = diskFileRecoveryBlob;
fileWritten.size = archiveFileSize;
fileWritten.checksumType = checksumType;
fileWritten.checksumValue = checksumValue;
fileWritten.storageClassName = storageClass.name;
fileWritten.storageClassName = s_storageClassName;
fileWritten.vid = currentVid;
fileWritten.fSeq = j;
fileWritten.blockId = j * 100;
......@@ -1207,10 +1215,11 @@ TEST_P(SchedulerTest, expandRepackRequest) {
//For each tapes
std::string vid = allVid.at(i-1);
std::list<common::dataStructures::RetrieveJob> retrieveJobs = scheduler.getPendingRetrieveJobs(vid,lc);
ASSERT_EQ(retrieveJobs.size(),nbArchiveFiles);
ASSERT_EQ(retrieveJobs.size(),nbArchiveFilesPerTape);
int j = 1;
for(auto retrieveJob : retrieveJobs){
//Test that the informations are correct for each file
ASSERT_EQ(retrieveJob.request.tapePool,s_tapePoolName);
ASSERT_EQ(retrieveJob.request.archiveFileID,archiveFileId++);
ASSERT_EQ(retrieveJob.fileSize,compressedFileSize);
std::stringstream ss;
......@@ -1232,9 +1241,8 @@ TEST_P(SchedulerTest, expandRepackRequest) {
//Now, we need to simulate a retrieve for each file
{
// Emulate a tape server by asking for nbTapesForTest mount and then all files
uint64_t archiveFileId1 = 1;
uint64_t archiveFileId2 = 1;
for(uint64_t i = 1; i<= nbTapesForTest ;++i){
for(uint64_t i = 1; i<= nbTapesForTest ;++i)
{
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE(nullptr, mount.get());
......@@ -1246,7 +1254,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs;
//For each tape we will see if the retrieve jobs are not null
for(uint64_t j = 1; j<=nbArchiveFiles; ++j)
for(uint64_t j = 1; j<=nbArchiveFilesPerTape; ++j)
{
auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc);
retrieveJob.reset(jobBatch.front().release());
......@@ -1270,7 +1278,11 @@ TEST_P(SchedulerTest, expandRepackRequest) {
rrp.waitThread();
ASSERT_EQ(rrp.allThreadsDone(),true);
}
uint64_t archiveFileId = 1;
for(uint64_t i = 1; i<= nbTapesForTest ;++i)
{
//After the jobs reported as completed, we will test that all jobs have been put in
//the RetrieveQueueToReportToRepackForSuccess and that they have the status RJS_Succeeded
{
......@@ -1287,7 +1299,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
rq.fetch();
//There should be nbArchiveFiles jobs in the retrieve queue
ASSERT_EQ(rq.dumpJobs().size(),nbArchiveFiles);
ASSERT_EQ(rq.dumpJobs().size(),nbArchiveFilesPerTape);
int j = 1;
for (auto &job: rq.dumpJobs()) {
//Create the retrieve request from the address of the job and the current backend
......@@ -1307,16 +1319,18 @@ TEST_P(SchedulerTest, expandRepackRequest) {
ASSERT_EQ(tapeFile.compressedSize, compressedFileSize);
//Testing scheduler retrieve request
ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId1++);
ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId++);
std::stringstream ss;
ss<<"repack://public_dir/public_file_"<<i<<"_"<<j;
ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str());
ASSERT_EQ(schedulerRetrieveRequest.isRepack,true);
ASSERT_EQ(schedulerRetrieveRequest.tapePool,s_tapePoolName);
std::ostringstream diskFilePath;
diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j;
ASSERT_EQ(schedulerRetrieveRequest.diskFileInfo.path,diskFilePath.str());
//Testing the retrieve request
ASSERT_EQ(retrieveRequest.isRepack(),true);
ASSERT_EQ(retrieveRequest.getTapePool(),s_tapePoolName);
ASSERT_EQ(retrieveRequest.getQueueType(),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess);
ASSERT_EQ(retrieveRequest.getRetrieveFileQueueCriteria().mountPolicy,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack);
ASSERT_EQ(retrieveRequest.getActiveCopyNumber(),1);
......@@ -1329,21 +1343,39 @@ TEST_P(SchedulerTest, expandRepackRequest) {
++j;
}
}
}
archiveFileId = 1;
std::vector<std::string> retrieveRequestsAddresses;
for(uint64_t i = 1; i<= nbTapesForTest ;++i)
{
//We will now test the getNextSucceededRetrieveRequestForRepackBatch method that
//pop all the RetrieveRequest from the RetrieveQueueToReportToRepackForSuccess queue
auto listSucceededRetrieveRequests = schedulerDB.getNextSucceededRetrieveRequestForRepackBatch(nbArchiveFilesPerTape,lc);
{
auto listSucceededRetrieveRequests = scheduler.getNextSucceededRetrieveRequestForRepackBatch(15,lc);
ASSERT_EQ(listSucceededRetrieveRequests.size(),nbArchiveFiles);
ASSERT_EQ(listSucceededRetrieveRequests.size(),nbArchiveFilesPerTape);
int j = 1;
for (auto &retrieveRequest: listSucceededRetrieveRequests) {
//Create the retrieve request from the address of the job and the current backend
uint64_t copyNb = retrieveRequest->selectedCopyNb;