Commit f8a1c3ed authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Added total files and bytes statistics in the RepackRequest

parent d4ad698d
......@@ -227,6 +227,27 @@ uint64_t RepackRequest::getLastExpandedFSeq() {
return m_payload.lastexpandedfseq();
}
void RepackRequest::addFileToRetrieve(const uint64_t nbFilesToRetrieve){
checkPayloadWritable();
m_payload.set_totalfilestoretrieve(m_payload.totalfilestoretrieve()+nbFilesToRetrieve);
}
void RepackRequest::addBytesToRetrieve(const uint64_t nbBytesToRetrieve){
checkPayloadWritable();
m_payload.set_totalbytestoretrieve(m_payload.totalbytestoretrieve()+nbBytesToRetrieve);
}
void RepackRequest::addFileToArchive(const uint64_t nbFilesToArchive){
checkPayloadWritable();
m_payload.set_totalfilestoarchive(m_payload.totalfilestoarchive()+nbFilesToArchive);
}
void RepackRequest::addBytesToArchive(const uint64_t nbBytesToArchive) {
checkPayloadWritable();
m_payload.set_totalbytestoarchive(m_payload.totalbytestoarchive()+nbBytesToArchive);
}
//------------------------------------------------------------------------------
// RepackRequest::reportRetriveSuccesses()
//------------------------------------------------------------------------------
......
......@@ -78,6 +78,11 @@ public:
void setLastExpandedFSeq(uint64_t lastExpandedFSeq);
uint64_t getLastExpandedFSeq();
void addFileToRetrieve(const uint64_t nbFilesToRetrieve);
void addBytesToRetrieve(const uint64_t nbBytesToRetrieve);
void addFileToArchive(const uint64_t nbFilesToArchive);
void addBytesToArchive(const uint64_t nbBytesToArchive);
struct SubrequestStatistics {
uint64_t fSeq;
uint64_t files = 1;
......
......@@ -1568,7 +1568,7 @@ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRepackJobToEx
}
//------------------------------------------------------------------------------
// OStoreDB::getNextRepackJobToExpand()
// OStoreDB::getNextRepackReportBatch()
//------------------------------------------------------------------------------
std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextRepackReportBatch(log::LogContext& lc) {
try {
......@@ -2297,6 +2297,16 @@ void OStoreDB::RepackRequest::expandDone() {
m_repackRequest.commit();
}
void OStoreDB::RepackRequest::setTotalStats(const TotalStatsFiles& stats){
ScopedExclusiveLock rrl(m_repackRequest);
m_repackRequest.fetch();
m_repackRequest.addFileToArchive(stats.totalFilesToArchive);
m_repackRequest.addBytesToArchive(stats.totalBytesToArchive);
m_repackRequest.addFileToRetrieve(stats.totalFilesToRetrieve);
m_repackRequest.addBytesToRetrieve(stats.totalBytesToRetrieve);
m_repackRequest.commit();
}
//------------------------------------------------------------------------------
// OStoreDB::RepackRequest::fail()
//------------------------------------------------------------------------------
......
......@@ -347,6 +347,8 @@ public:
void expandDone() override;
void fail() override;
uint64_t getLastExpandedFSeq() override;
void setTotalStats(const TotalStatsFiles& stats) override;
private:
OStoreDB & m_oStoreDB;
objectstore::RepackRequest m_repackRequest;
......
......@@ -424,6 +424,8 @@ const std::string Scheduler::generateRetrieveDstURL(const cta::common::dataStruc
void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext& lc) {
std::list<common::dataStructures::ArchiveFile> files;
auto repackInfo = repackRequest->getRepackInfo();
cta::SchedulerDatabase::RepackRequest::TotalStatsFiles totalStatsFile;
typedef cta::common::dataStructures::RepackInfo::Type RepackType;
if (repackInfo.type != RepackType::MoveOnly) {
log::ScopedParamContainer params(lc);
......@@ -457,6 +459,8 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
auto & rsr = retrieveSubrequests.back();
rsr.archiveFile = archiveFile;
rsr.fSeq = std::numeric_limits<decltype(rsr.fSeq)>::max();
totalStatsFile.totalBytesToRetrieve += rsr.archiveFile.fileSize;
totalStatsFile.totalFilesToRetrieve += 1;
// We have to determine which copynbs we want to rearchive, and under which fSeq we record this file.
if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::MoveOnly) {
// determine which fSeq(s) (normally only one) lives on this tape.
......@@ -468,6 +472,8 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
// restart of expansion.
rsr.fSeq = std::min(tc.second.fSeq, rsr.fSeq);
maxAddedFSeq = std::max(maxAddedFSeq, rsr.fSeq);
totalStatsFile.totalFilesToArchive += 1;
totalStatsFile.totalBytesToArchive += rsr.archiveFile.fileSize;
}
}
if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::AddCopiesOnly) {
......@@ -494,6 +500,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
// We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted).
// We pass this information to the db for recording in the repack request. This will allow restarting from the right
// value in case of crash.
repackRequest->m_dbReq->setTotalStats(totalStatsFile);
repackRequest->m_dbReq->addSubrequests(retrieveSubrequests, archiveRoutesMap, fSeq - 1, lc);
fSeq = std::max(fSeq, maxAddedFSeq + 1);
}
......
......@@ -439,8 +439,19 @@ public:
std::set<uint32_t> copyNbsToRearchive;
std::string fileBufferURL;
};
//Struct to hold the RepackRequest's total stats
struct TotalStatsFiles{
uint64_t totalFilesToArchive = 0;
uint64_t totalBytesToArchive = 0;
uint64_t totalFilesToRetrieve = 0;
uint64_t totalBytesToRetrieve = 0;
//TODO : userprovidedfiles and userprovidedbytes
};
virtual void addSubrequests(std::list<Subrequest>& repackSubrequests,
cta::common::dataStructures::ArchiveRoute::FullMap & archiveRoutesMap, uint64_t maxFSeqLowBound, log::LogContext & lc) = 0;
virtual void setTotalStats(const TotalStatsFiles& stats) = 0;
virtual void expandDone() = 0;
virtual void fail() = 0;
virtual ~RepackRequest() {}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment