Commit ed6a10e1 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Disk files are removed after an archive repack subrequest deletion

The folder containing the files need now to be removed
parent 0e30e337
......@@ -157,7 +157,9 @@ target_link_libraries (ctacommon
uuid
z
cap
XrdCl)
XrdCl
${XROOTD_XRDCLIENT_LIB}
)
set (COMMON_UNIT_TESTS_LIB_SRC_FILES
checksum/ChecksumTest.cpp
......@@ -201,4 +203,3 @@ set_property(TARGET ctacommonunittests PROPERTY VERSION "${CTA_LIBVERSION}")
install(TARGETS ctacommonunittests DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
add_executable(mutexLtrace threading/MutexLtrace.cpp)
target_link_libraries(mutexLtrace ctacommon)
......@@ -36,6 +36,7 @@
#include <sys/utsname.h>
#include <sys/prctl.h>
#include <iomanip>
#include <xrootd/XrdClient/XrdClientUrlInfo.hh>
using cta::exception::Exception;
......@@ -793,5 +794,10 @@ std::string getCurrentLocalTime() {
return std::string(buff) + std::string(buff2);
}
std::string truncateXrootdPath(const std::string& path){
XrdClientUrlInfo urlInfo(path.c_str());
return std::string(urlInfo.File.c_str());
}
} // namespace utils
} // namespace cta
......@@ -370,6 +370,14 @@ namespace utils {
* @return the foramtted time.
*/
std::string getCurrentLocalTime();
/**
* Remove the root:// part of the path passed in parameter
* and return the corresponding string
* @param path the xroot path to truncate
* @return the xroot path truncated
*/
std::string truncateXrootdPath(const std::string &path);
} // namespace utils
......
......@@ -17,7 +17,7 @@
*/
#include "common/exception/Exception.hpp"
#include "common/utils/utils.cpp"
#include "common/utils/utils.hpp"
#include "rdbms/ConnPool.hpp"
#include "rdbms/Conn.hpp"
......
......@@ -178,7 +178,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
// Get the next job to report and make sure we will not attempt to process it twice.
job = std::move(successfulArchiveJobs.front());
successfulArchiveJobs.pop();
if (!job.get()) continue;
if (!job.get()) continue;
tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release());
files++;
bytes+=job->archiveFile.fileSize;
......
......@@ -37,6 +37,7 @@
#include "common/make_unique.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
#include "Scheduler.hpp"
#include "tapeserver/castor/tape/tapeserver/file/DiskFile.hpp"
#include <algorithm>
#include <cmath>
#include <stdlib.h> /* srand, rand */
......@@ -4124,13 +4125,32 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
}
}
timingList.insertAndReset("asyncUpdateOrDeleteLaunchTime", t);
struct DiskFileRemovers{
std::unique_ptr<castor::tape::diskFile::AsyncDiskFileRemover> asyncRemover;
RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> & subrequestInfo;
typedef std::list<DiskFileRemovers> List;
};
DiskFileRemovers::List diskFileRemoverList;
for (auto & d: deletersList) {
try {
d.deleter->wait();
log::ScopedParamContainer params(lc);
params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet());
lc.log(log::INFO, "In OStoreDB::RepackArchiveReportBatch::report(): deleted request.");
try {
//Subrequest deleted, async delete the file from the disk
castor::tape::diskFile::DiskFileRemoverFactory fileRemoverFactory;
std::unique_ptr<castor::tape::diskFile::DiskFileRemover> remover(fileRemoverFactory.createDiskFileRemover(d.subrequestInfo.repackInfo.fileBufferURL));
diskFileRemoverList.push_back(DiskFileRemovers{std::unique_ptr<castor::tape::diskFile::AsyncDiskFileRemover>(new castor::tape::diskFile::AsyncDiskFileRemover(std::move(remover))),d.subrequestInfo});
diskFileRemoverList.back().asyncRemover->asyncDelete();
} catch (const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet());
lc.log(log::INFO, "In OStoreDB::RepackArchiveReportBatch::report(): deleted request.");
.add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet())
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::RepackArchiveReportBatch::report(): async deletion of disk file failed.");
}
} catch (cta::exception::Exception & ex) {
// Log the error
log::ScopedParamContainer params(lc);
......@@ -4140,6 +4160,24 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
lc.log(log::ERR, "In OStoreDB::RepackArchiveReportBatch::report(): async deletion failed.");
}
}
for(auto & dfr: diskFileRemoverList){
try {
dfr.asyncRemover->wait();
log::ScopedParamContainer params(lc);
params.add("fileId", dfr.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", dfr.subrequestInfo.subrequest->getAddressIfSet())
.add("fileBufferURL", dfr.subrequestInfo.repackInfo.fileBufferURL);
lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async deleted file.");
} catch (const cta::exception::Exception& ex){
// Log the error
log::ScopedParamContainer params(lc);
params.add("fileId", dfr.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", dfr.subrequestInfo.subrequest->getAddressIfSet())
.add("fileBufferURL", dfr.subrequestInfo.repackInfo.fileBufferURL)
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async file not deleted.");
}
}
for (auto & jou: jobOwnerUpdatersList) {
try {
jou.jobOwnerUpdater->wait();
......
......@@ -62,7 +62,7 @@ target_link_libraries(BasicReadWriteTest
add_library(File
${TAPESERVER_FILE_LIBRARY_SRCS})
target_link_libraries (File XrdCl XrdClient cryptopp radosstriper)
target_link_libraries (File XrdCl cryptopp radosstriper)
if(CMAKE_COMPILER_IS_GNUCC)
if(GCC_VERSION_GE_4_8_0)
......
......@@ -22,12 +22,12 @@
*****************************************************************************/
#include <sys/types.h>
#include <sys/stat.h>
#include <xrootd/XrdClient/XrdClientUrlInfo.hh>
#include "castor/tape/tapeserver/file/DiskFileImplementations.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/exception/Errnum.hpp"
#include "common/threading/MutexLocker.hpp"
#include "common/utils/utils.hpp"
#include <rados/buffer.h>
#include <xrootd/XrdCl/XrdClFile.hh>
#include <uuid/uuid.h>
......@@ -594,6 +594,70 @@ void RadosStriperWriteFile::close() {
RadosStriperWriteFile::~RadosStriperWriteFile() throw() {}
//==============================================================================
// DiskFileRemover FACTORY
//==============================================================================
DiskFileRemoverFactory::DiskFileRemoverFactory():
m_URLLocalFile("^file://(.*)$"),
m_URLXrootdFile("^(root://.*)$"){}
DiskFileRemover * DiskFileRemoverFactory::createDiskFileRemover(const std::string &path){
// URL path parsing
std::vector<std::string> regexResult;
//local file URL?
regexResult = m_URLLocalFile.exec(path);
if(regexResult.size()){
return new LocalDiskFileRemover(regexResult[1]);
}
regexResult = m_URLXrootdFile.exec(path);
if(regexResult.size()){
return new XRootdDiskFileRemover(path);
}
throw cta::exception::Exception("In DiskFileRemoverFactory::createDiskFileRemover: unknown type of URL");
}
//==============================================================================
// LocalDiskFileRemover
//==============================================================================
LocalDiskFileRemover::LocalDiskFileRemover(const std::string &path){
m_URL = path;
}
void LocalDiskFileRemover::remove(){
cta::exception::Errnum::throwOnNonZero(::remove(m_URL.c_str()),"In LocalDiskFileRemover::remove(), failed to delete the file at "+m_URL);
}
//==============================================================================
// XRootdDiskFileRemover
//==============================================================================
XRootdDiskFileRemover::XRootdDiskFileRemover(const std::string& path):m_xrootFileSystem(path){
m_URL = path;
m_truncatedFileURL = cta::utils::truncateXrootdPath(path);
}
void XRootdDiskFileRemover::remove(){
//XrdCl::ResponseHandler response;
XrdCl::XRootDStatus statusRm = m_xrootFileSystem.Rm(m_truncatedFileURL,c_xrootTimeout);
/*XrdCl::AnyObject obj;
response.HandleResponse(&statusRm,&obj);*/
cta::exception::XrootCl::throwOnError(statusRm,"In XRootdDiskFileRemover::remove(), fail to remove file at "+m_URL);
}
//==============================================================================
// AsyncDiskFileRemover
//==============================================================================
AsyncDiskFileRemover::AsyncDiskFileRemover(std::unique_ptr<DiskFileRemover> diskFileRemover):m_diskFileRemover(std::move(diskFileRemover)){
}
void AsyncDiskFileRemover::asyncDelete(){
m_futureDeletion = std::async(std::launch::async,[this](){m_diskFileRemover->remove();});
}
void AsyncDiskFileRemover::wait(){
m_futureDeletion.get();
}
//==============================================================================
// DIRECTORY FACTORY
......@@ -657,12 +721,7 @@ std::set<std::string> LocalDirectory::getFilesName(){
//==============================================================================
XRootdDirectory::XRootdDirectory(const std::string& path):m_xrootFileSystem(path){
m_URL = path;
m_truncatedDirectoryURL = this->truncatePath(path);
}
std::string XRootdDirectory::truncatePath(const std::string &path) {
XrdClientUrlInfo urlInfo(path.c_str());
return std::string(urlInfo.File.c_str());
m_truncatedDirectoryURL = cta::utils::truncateXrootdPath(path);
}
void XRootdDirectory::mkdir() {
......
......@@ -29,6 +29,7 @@
#include <memory>
#include <stdint.h>
#include <set>
#include <future>
/*
* This file only contains the interface declaration of the base classes
* the real implementation, which depends on many includes is hidden in
......@@ -48,6 +49,7 @@ namespace castor {
class ReadFile;
class WriteFile;
class DiskFileRemover;
class Directory;
/**
......@@ -147,7 +149,39 @@ namespace castor {
*/
std::string m_URL;
};
/**
* Factory class deciding which disk file remover
* to instanciate regarding the format of the path pass of the disk file
*/
class DiskFileRemoverFactory {
typedef cta::utils::Regex Regex;
public:
DiskFileRemoverFactory();
DiskFileRemover * createDiskFileRemover(const std::string &path);
private:
Regex m_URLLocalFile;
Regex m_URLXrootdFile;
};
class DiskFileRemover{
public:
virtual void remove() = 0;
virtual ~DiskFileRemover() throw() {}
protected:
std::string m_URL;
};
class AsyncDiskFileRemover{
public:
AsyncDiskFileRemover(std::unique_ptr<DiskFileRemover> diskFileRemover);
void asyncDelete();
void wait();
private:
std::future<void> m_futureDeletion;
std::unique_ptr<DiskFileRemover> m_diskFileRemover;
};
/**
* Factory class deciding what type of Directory subclass
* to instanciate based on the URL passed
......
......@@ -175,6 +175,22 @@ namespace castor {
size_t m_writePosition;
};
class LocalDiskFileRemover: public DiskFileRemover {
public:
LocalDiskFileRemover(const std::string& path);
void remove() override;
};
class XRootdDiskFileRemover: public DiskFileRemover{
public:
XRootdDiskFileRemover(const std::string &path);
void remove() override;
private:
XrdCl::FileSystem m_xrootFileSystem;
std::string m_truncatedFileURL; // root://.../ part of the path is removed
const uint16_t c_xrootTimeout = 15;
};
class LocalDirectory: public Directory {
public:
LocalDirectory(const std::string& path);
......@@ -189,14 +205,7 @@ namespace castor {
virtual void mkdir() override;
virtual bool exist() override;
virtual std::set<std::string> getFilesName() override;
private:
/**
* Remove the root://.../ from the path passed in parameter
* @param path the path to truncate
* @return the truncated path
*/
std::string truncatePath(const std::string &path);
private:
XrdCl::FileSystem m_xrootFileSystem;
std::string m_truncatedDirectoryURL; // root://.../ part of the path is removed
const uint16_t c_xrootTimeout = 15;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment