Commit 526451ab authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Refactored the calling of the deletion of the repack buffer's directory

parent 982f7579
......@@ -316,8 +316,9 @@ void RepackRequest::reportRetriveFailures(SubrequestStatistics::List& retrieveFa
//------------------------------------------------------------------------------
// RepackRequest::reportArchiveSuccesses()
// Returns the status of the RepackRequest
//------------------------------------------------------------------------------
void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSuccesses) {
serializers::RepackRequestStatus RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSuccesses) {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
......@@ -349,12 +350,13 @@ void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSu
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
}
return m_payload.status();
}
//------------------------------------------------------------------------------
// RepackRequest::reportArchiveFailures()
//------------------------------------------------------------------------------
void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFailures) {
serializers::RepackRequestStatus RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFailures) {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
......@@ -381,6 +383,7 @@ void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFai
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
}
return m_payload.status();
}
//------------------------------------------------------------------------------
......
......@@ -95,8 +95,8 @@ public:
};
void reportRetriveSuccesses (SubrequestStatistics::List & retrieveSuccesses);
void reportRetriveFailures (SubrequestStatistics::List & retrieveFailures);
void reportArchiveSuccesses (SubrequestStatistics::List & archiveSuccesses);
void reportArchiveFailures (SubrequestStatistics::List & archiveFailures);
serializers::RepackRequestStatus reportArchiveSuccesses (SubrequestStatistics::List & archiveSuccesses);
serializers::RepackRequestStatus reportArchiveFailures (SubrequestStatistics::List & archiveFailures);
void reportSubRequestsForDeletion (std::list<uint64_t>& fSeqs);
enum class StatsType: uint8_t {
UserProvided,
......
......@@ -3985,16 +3985,17 @@ void OStoreDB::RepackArchiveSuccessesReportBatch::report(log::LogContext& lc) {
//------------------------------------------------------------------------------
// OStoreDB::RepackArchiveSuccessesReportBatch::recordReport()
//------------------------------------------------------------------------------
void OStoreDB::RepackArchiveSuccessesReportBatch::recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t){
serializers::RepackRequestStatus OStoreDB::RepackArchiveSuccessesReportBatch::recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t){
timingList.insertAndReset("successStatsPrepareTime", t);
objectstore::ScopedExclusiveLock rrl(m_repackRequest);
timingList.insertAndReset("successStatsLockTime", t);
m_repackRequest.fetch();
timingList.insertAndReset("successStatsFetchTime", t);
m_repackRequest.reportArchiveSuccesses(ssl);
serializers::RepackRequestStatus repackRequestStatus = m_repackRequest.reportArchiveSuccesses(ssl);
timingList.insertAndReset("successStatsUpdateTime", t);
m_repackRequest.commit();
timingList.insertAndReset("successStatsCommitTime", t);
return repackRequestStatus;
}
//------------------------------------------------------------------------------
......@@ -4013,17 +4014,19 @@ void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){
//------------------------------------------------------------------------------
// OStoreDB::RepackArchiveFailureReportBatch::recordReport()
// Return false as at least 1 archive did not work
//------------------------------------------------------------------------------
void OStoreDB::RepackArchiveFailureReportBatch::recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t){
serializers::RepackRequestStatus OStoreDB::RepackArchiveFailureReportBatch::recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t){
timingList.insertAndReset("failureStatsPrepareTime", t);
objectstore::ScopedExclusiveLock rrl(m_repackRequest);
timingList.insertAndReset("failureStatsLockTime", t);
m_repackRequest.fetch();
timingList.insertAndReset("failureStatsFetchTime", t);
m_repackRequest.reportArchiveFailures(ssl);
serializers::RepackRequestStatus repackRequestStatus = m_repackRequest.reportArchiveFailures(ssl);
timingList.insertAndReset("failureStatsUpdateTime", t);
m_repackRequest.commit();
timingList.insertAndReset("failureStatsCommitTime", t);
return repackRequestStatus;
}
//------------------------------------------------------------------------------
......@@ -4071,7 +4074,8 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
// 1) Update statistics. As the repack request is protected against double reporting, we can release its lock
// before the next (deletions).
objectstore::RepackRequest::SubrequestStatistics::List statistics = prepareReport();
recordReport(statistics,timingList,t);
objectstore::serializers::RepackRequestStatus repackRequestStatus = recordReport(statistics,timingList,t);
std::string bufferURL;
// 2) For each job, determine if sibling jobs are complete or not. If so, delete, else just update status and set empty owner.
struct Deleters {
......@@ -4087,6 +4091,7 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
Deleters::List deletersList;
JobOwnerUpdaters::List jobOwnerUpdatersList;
for (auto &sri: m_subrequestList) {
bufferURL = sri.repackInfo.fileBufferURL;
bool moreJobsToDo = false;
for (auto &j: sri.archiveJobsStatusMap) {
if ((j.first != sri.archivedCopyNb) &&
......@@ -4177,27 +4182,23 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async file not deleted.");
}
if(&dfr == &(diskFileRemoverList.back())){
//We deleted the last file, delete the buffer directory
castor::tape::diskFile::DirectoryFactory directoryFactory;
std::string directoryPath = cta::utils::getEnclosingPath(dfr.subrequestInfo.repackInfo.fileBufferURL);
std::unique_ptr<castor::tape::diskFile::Directory> directory;
try{
directory.reset(directoryFactory.createDirectory(directoryPath));
directory->rmdir();
log::ScopedParamContainer params(lc);
params.add("fileId", dfr.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", dfr.subrequestInfo.subrequest->getAddressIfSet())
.add("fileBufferURL", dfr.subrequestInfo.repackInfo.fileBufferURL);
lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): deleted the "+directoryPath+" directory");
} catch (const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("fileId", dfr.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", dfr.subrequestInfo.subrequest->getAddressIfSet())
.add("fileBufferURL", dfr.subrequestInfo.repackInfo.fileBufferURL)
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): failed to remove the "+directoryPath+" directory");
}
}
if(repackRequestStatus == objectstore::serializers::RepackRequestStatus::RRS_Complete){
//Repack Request is complete, delete the directory in the buffer
castor::tape::diskFile::DirectoryFactory directoryFactory;
std::string directoryPath = cta::utils::getEnclosingPath(bufferURL);
std::unique_ptr<castor::tape::diskFile::Directory> directory;
try{
directory.reset(directoryFactory.createDirectory(directoryPath));
directory->rmdir();
log::ScopedParamContainer params(lc);
params.add("repackRequestAddress", m_repackRequest.getAddressIfSet());
lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): deleted the "+directoryPath+" directory");
} catch (const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("repackRequestAddress", m_repackRequest.getAddressIfSet())
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): failed to remove the "+directoryPath+" directory");
}
}
for (auto & jou: jobOwnerUpdatersList) {
......
......@@ -470,7 +470,7 @@ public:
void report(log::LogContext &lc);
private:
objectstore::RepackRequest::SubrequestStatistics::List prepareReport();
virtual void recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) = 0;
virtual cta::objectstore::serializers::RepackRequestStatus recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) = 0;
virtual cta::objectstore::serializers::ArchiveJobStatus getNewStatus() = 0;
};
......@@ -481,7 +481,7 @@ public:
public:
void report(log::LogContext& lc) override;
private:
void recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) override;
cta::objectstore::serializers::RepackRequestStatus recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) override;
cta::objectstore::serializers::ArchiveJobStatus getNewStatus() override;
};
......@@ -493,7 +493,7 @@ public:
public:
void report(log::LogContext& lc) override;
private:
void recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) override;
cta::objectstore::serializers::RepackRequestStatus recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) override;
cta::objectstore::serializers::ArchiveJobStatus getNewStatus() override;
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment