Commit e1a217f9 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

First version of expandRepackRequest + unit test

parent 0124da44
......@@ -53,6 +53,7 @@ set (COMMON_LIB_SRC_FILES
dataStructures/RepackInfo.cpp
dataStructures/RequesterGroupMountRule.cpp
dataStructures/RequesterMountRule.cpp
dataStructures/RetrieveFileQueueCriteria.cpp
dataStructures/RetrieveJob.cpp
dataStructures/RetrieveRequest.cpp
dataStructures/SecurityIdentity.cpp
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RetrieveFileQueueCriteria.hpp"
namespace cta {
namespace common {
namespace dataStructures {
RetrieveFileQueueCriteria& RetrieveFileQueueCriteria::operator=(const RetrieveFileQueueCriteria& other){
if(this != &other){
this->archiveFile = other.archiveFile;
this->mountPolicy = other.mountPolicy;
}
return *this;
}
}}}
\ No newline at end of file
......@@ -42,6 +42,8 @@ struct RetrieveFileQueueCriteria {
* The mount policy.
*/
MountPolicy mountPolicy;
RetrieveFileQueueCriteria &operator=(const RetrieveFileQueueCriteria& other);
}; // struct RetrieveFileQueueCriteria
......
......@@ -27,7 +27,7 @@ namespace dataStructures {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
RetrieveRequest::RetrieveRequest(): archiveFileID(0) {}
RetrieveRequest::RetrieveRequest(): archiveFileID(0),isRepack(false) {}
//------------------------------------------------------------------------------
// operator==
......@@ -37,7 +37,8 @@ bool RetrieveRequest::operator==(const RetrieveRequest &rhs) const {
&& archiveFileID==rhs.archiveFileID
&& dstURL==rhs.dstURL
&& diskFileInfo==rhs.diskFileInfo
&& creationLog==rhs.creationLog;
&& creationLog==rhs.creationLog
&& isRepack == rhs.isRepack;
}
//------------------------------------------------------------------------------
......@@ -55,7 +56,8 @@ std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj) {
<< " archiveFileID=" << obj.archiveFileID
<< " dstURL=" << obj.dstURL
<< " diskFileInfo=" << obj.diskFileInfo
<< " creationLog=" << obj.creationLog << ")";
<< " creationLog=" << obj.creationLog
<< " isRepack=" << obj.isRepack<<")";
return os;
}
......
......@@ -48,6 +48,7 @@ struct RetrieveRequest {
std::string errorReportURL;
DiskFileInfo diskFileInfo;
EntryLog creationLog;
bool isRepack;
}; // struct RetrieveRequest
......
......@@ -57,6 +57,7 @@ void RetrieveRequest::initialize() {
ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>::initialize();
m_payload.set_failurereportlog("");
m_payload.set_failurereporturl("");
m_payload.set_isrepack(false);
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
......@@ -850,4 +851,14 @@ void RetrieveRequest::setJobStatus(uint64_t copyNumber, const serializers::Retri
throw exception::Exception("In RetrieveRequest::setJobStatus(): job not found.");
}
bool RetrieveRequest::isRepack(){
checkPayloadReadable();
return m_payload.isrepack();
}
void RetrieveRequest::setIsRepack(const bool isRepack){
checkPayloadWritable();
m_payload.set_isrepack(isRepack);
}
}} // namespace cta::objectstore
......@@ -93,6 +93,9 @@ public:
//! The copy number to enqueue. It could be different from the updated one in mixed success/failure scenario.
serializers::RetrieveJobStatus nextStatus;
};
bool isRepack();
void setIsRepack(const bool isRepack);
private:
/*!
* Determine and set the new status of the job.
......
......@@ -369,6 +369,7 @@ message RetrieveRequest {
repeated RetrieveJob jobs = 9154;
required string failurereporturl = 9155;
required string failurereportlog = 9156;
required bool isrepack = 9157; //In protobuf, default values for bool is false
}
message ValueCountPair {
......
......@@ -183,8 +183,13 @@ void Scheduler::queueRetrieve(
using utils::midEllipsis;
utils::Timer t;
// Get the queue criteria
const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc);
common::dataStructures::RetrieveFileQueueCriteria queueCriteria;
if(!request.isRepack){
queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc);
} else {
//Repack does not need policy
queueCriteria.archiveFile = m_catalogue.getArchiveFileById(request.archiveFileID);
}
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc);
auto schedulerDbTime = t.secs();
......@@ -220,15 +225,23 @@ void Scheduler::queueRetrieve(
spc.add(tc.str(), tf.second);
}
spc.add("selectedVid", selectedVid)
.add("policyName", queueCriteria.mountPolicy.name)
.add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
.add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
.add("policyPriority", queueCriteria.mountPolicy.retrievePriority)
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
if(!request.isRepack){
spc.add("policyName", queueCriteria.mountPolicy.name)
.add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
.add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
.add("policyPriority", queueCriteria.mountPolicy.retrievePriority);
}
lc.log(log::INFO, "Queued retrieve request");
}
void Scheduler::queueRetrieveRequestForRepack(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
std::list<uint64_t> copyNbs, log::LogContext &lc)
{
}
//------------------------------------------------------------------------------
// deleteArchive
//------------------------------------------------------------------------------
......@@ -385,15 +398,37 @@ std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() {
return nullptr;
}
const std::string Scheduler::generateRetrieveDstURL(const cta::common::dataStructures::DiskFileInfo dfi) const{
std::ostringstream strStream;
strStream<<"repack:/"<<dfi.path;
return strStream.str();
}
//------------------------------------------------------------------------------
// expandRepackRequest
//------------------------------------------------------------------------------
void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext& lc) {
typedef cta::common::dataStructures::RetrieveRequest RetrieveRequest;
RetrieveRequest retrieveRequest;
retrieveRequest.dstURL = "";
this->queueRetrieve("ExpandRepack",retrieveRequest,lc);
throw exception::Exception("In Scheduler::expandRepackRequest(): not implemented");
uint64_t fseq = c_defaultFseqForRepack;
std::list<common::dataStructures::ArchiveFile> files;
std::list<uint64_t> copyNbs;
auto vid = repackRequest->getRepackInfo().vid;
while(true) {
files = m_catalogue.getFilesForRepack(vid,fseq,c_defaultMaxNbFilesForRepack);
for(auto &archiveFile : files)
{
cta::common::dataStructures::RetrieveRequest retrieveRequest;
retrieveRequest.archiveFileID = archiveFile.archiveFileID;
retrieveRequest.diskFileInfo = archiveFile.diskFileInfo;
retrieveRequest.dstURL = generateRetrieveDstURL(archiveFile.diskFileInfo);
retrieveRequest.isRepack = true;
//retrieveRequest.requester = repackRequest->
queueRetrieve(archiveFile.diskInstance,retrieveRequest,lc);
}
if (files.size()) {
auto & tf=files.back().tapeFiles;
fseq = std::find_if(tf.cbegin(), tf.cend(), [vid](decltype(*(tf.cbegin())) &f){ return f.second.vid == vid; })->second.fSeq + 1;
} else break;
}
}
......
......@@ -142,6 +142,17 @@ public:
void queueRetrieve(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
log::LogContext &lc);
/**
* Queue a retrieve request specific for Repack
* @param instanceName
* @param request
* @param copyNbs
* @param lc
*/
void queueRetrieveRequestForRepack(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
std::list<uint64_t> copyNbs, log::LogContext &lc);
/**
* Delete an archived file or a file which is in the process of being archived.
* Throws a UserError exception in case of wrong request parameters (ex. unknown file id)
......@@ -282,6 +293,9 @@ public:
/*============== Actual mount scheduling and queue status reporting ========*/
private:
const uint64_t c_defaultFseqForRepack = 1;
const size_t c_defaultMaxNbFilesForRepack = 500;
typedef std::pair<std::string, common::dataStructures::MountType> tpType;
/**
* Common part to getNextMountDryRun() and getNextMount() to populate mount decision info.
......@@ -292,6 +306,8 @@ private:
std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList,
double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc);
const std::string generateRetrieveDstURL(const cta::common::dataStructures::DiskFileInfo dfi) const;
public:
/**
* Run the mount decision logic lock free, so we have no contention in the
......
......@@ -34,6 +34,8 @@
#include "objectstore/GarbageCollector.hpp"
#include "objectstore/BackendRadosTestSwitch.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include "common/Timer.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
#endif
......@@ -107,6 +109,7 @@ public:
const uint64_t nbConns = 1;
const uint64_t nbArchiveFileListingConns = 1;
const uint32_t maxTriesToConnect = 1;
//m_catalogue = cta::make_unique<catalogue::SchemaCreatingSqliteCatalogue>(m_dummyLog,"/home/cedric/db/test.db",nbConns,nbArchiveFileListingConns,maxTriesToConnect);
m_catalogue = cta::make_unique<catalogue::InMemoryCatalogue>(m_dummyLog, nbConns, nbArchiveFileListingConns, maxTriesToConnect);
m_scheduler = cta::make_unique<Scheduler>(*m_catalogue, *m_db, 5, 2*1000*1000);
}
......@@ -1076,6 +1079,111 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) {
ASSERT_EQ(nullRepackRequest,nullptr);
}
TEST_P(SchedulerTest, expandRepackRequest) {
using namespace cta;
setupDefaultCatalogue();
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
cta::log::DummyLogger dummyLogger("dummy","dummy");
log::LogContext lc(dummyLogger);
const std::string vid1 = "VID123";
const std::string vid2 = "VID456";
const std::string mediaType = "media_type";
const std::string vendor = "vendor";
const std::string logicalLibraryName = "logical_library_name";
const std::string tapePoolName1 = "tape_pool_name_1";
const std::string vo = "vo";
const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
const bool disabledValue = false;
const bool fullValue = false;
const std::string comment = "Create tape";
cta::common::dataStructures::SecurityIdentity admin;
admin.username = "admin_user_name";
admin.host = "admin_host";
catalogue.createLogicalLibrary(admin, logicalLibraryName, "Create logical library");
catalogue.createTapePool(admin, tapePoolName1, vo, 1, true, "Create tape pool");
catalogue.createTape(admin, vid1, mediaType, vendor, logicalLibraryName, tapePoolName1, capacityInBytes,
disabledValue, fullValue, comment);
common::dataStructures::StorageClass storageClass;
storageClass.diskInstance = "disk_instance";
storageClass.name = "storage_class";
storageClass.nbCopies = 2;
storageClass.comment = "Create storage class";
catalogue.createStorageClass(admin, storageClass);
const std::string checksumType = "checksum_type";
const std::string checksumValue = "checksum_value";
const std::string tapeDrive = "tape_drive";
const uint64_t nbArchiveFiles = 10; // Must be a multiple of 2 fo rthis test
const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
const uint64_t compressedFileSize = archiveFileSize;
std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1;
for(uint64_t i = 1; i <= nbArchiveFiles; i++) {
std::ostringstream diskFileId;
diskFileId << (12345677 + i);
std::ostringstream diskFilePath;
diskFilePath << "/public_dir/public_file_" << i;
// Tape copy 1 written to tape
auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
auto & fileWritten = *fileWrittenUP;
fileWritten.archiveFileId = i;
fileWritten.diskInstance = storageClass.diskInstance;
fileWritten.diskFileId = diskFileId.str();
fileWritten.diskFilePath = diskFilePath.str();
fileWritten.diskFileUser = "public_disk_user";
fileWritten.diskFileGroup = "public_disk_group";
fileWritten.diskFileRecoveryBlob = "opaque_disk_file_recovery_contents";
fileWritten.size = archiveFileSize;
fileWritten.checksumType = checksumType;
fileWritten.checksumValue = checksumValue;
fileWritten.storageClassName = storageClass.name;
fileWritten.vid = vid1;
fileWritten.fSeq = i;
fileWritten.blockId = i * 100;
fileWritten.compressedSize = compressedFileSize;
fileWritten.copyNb = 1;
fileWritten.tapeDrive = tapeDrive;
tapeFilesWrittenCopy1.emplace(fileWrittenUP.release());
}
//update the DB tape
catalogue.filesWrittenToTape(tapeFilesWrittenCopy1);
scheduler.queueRepack(admin,vid1,"bufferURL",common::dataStructures::RepackInfo::Type::ExpandOnly,lc);
scheduler.promoteRepackRequestsToToExpand(lc);
auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand();
log::TimingList tl;
utils::Timer t;
scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc);
scheduler.waitSchedulerDbSubthreadsComplete();
std::list<common::dataStructures::RetrieveJob> retrieveJobs = scheduler.getPendingRetrieveJobs(vid1,lc);
ASSERT_EQ(retrieveJobs.size(),10);
int i = 1;
for(auto retrieveJob : retrieveJobs){
ASSERT_EQ(retrieveJob.request.archiveFileID,i);
ASSERT_EQ(retrieveJob.fileSize,compressedFileSize);
std::stringstream ss;
ss<<"repack://public_dir/public_file_"<<i;
ASSERT_EQ(retrieveJob.request.dstURL, ss.str());
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.copyNb,1);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.checksumType,checksumType);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.checksumValue,checksumValue);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.blockId,i*100);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.compressedSize,compressedFileSize);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.fSeq,i);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.vid,vid1);
++i;
}
}
#undef TEST_MOCK_DB
#ifdef TEST_MOCK_DB
static cta::MockSchedulerDatabaseFactory mockDbFactory;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment