Commit 982f7579 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Refactored the async deletion of repack subrequests files via XRootD (used the...

Refactored the async deletion of repack subrequests files via XRootD (used the xroot's async deletion)
Deleted the folder containing the repack subrequests files (Local file + XRootD)
parent ed6a10e1
......@@ -794,7 +794,7 @@ std::string getCurrentLocalTime() {
return std::string(buff) + std::string(buff2);
}
std::string truncateXrootdPath(const std::string& path){
std::string extractPathFromXrootdPath(const std::string& path){
XrdClientUrlInfo urlInfo(path.c_str());
return std::string(urlInfo.File.c_str());
}
......
......@@ -377,7 +377,7 @@ namespace utils {
* @param path the xroot path to truncate
* @return the xroot path truncated
*/
std::string truncateXrootdPath(const std::string &path);
std::string extractPathFromXrootdPath(const std::string &path);
} // namespace utils
......
......@@ -4140,9 +4140,9 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
lc.log(log::INFO, "In OStoreDB::RepackArchiveReportBatch::report(): deleted request.");
try {
//Subrequest deleted, async delete the file from the disk
castor::tape::diskFile::DiskFileRemoverFactory fileRemoverFactory;
std::unique_ptr<castor::tape::diskFile::DiskFileRemover> remover(fileRemoverFactory.createDiskFileRemover(d.subrequestInfo.repackInfo.fileBufferURL));
diskFileRemoverList.push_back(DiskFileRemovers{std::unique_ptr<castor::tape::diskFile::AsyncDiskFileRemover>(new castor::tape::diskFile::AsyncDiskFileRemover(std::move(remover))),d.subrequestInfo});
castor::tape::diskFile::AsyncDiskFileRemoverFactory asyncDiskFileRemoverFactory;
std::unique_ptr<castor::tape::diskFile::AsyncDiskFileRemover> asyncRemover(asyncDiskFileRemoverFactory.createAsyncDiskFileRemover(d.subrequestInfo.repackInfo.fileBufferURL));
diskFileRemoverList.push_back({std::move(asyncRemover),d.subrequestInfo});
diskFileRemoverList.back().asyncRemover->asyncDelete();
} catch (const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
......@@ -4177,6 +4177,28 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async file not deleted.");
}
if(&dfr == &(diskFileRemoverList.back())){
//We deleted the last file, delete the buffer directory
castor::tape::diskFile::DirectoryFactory directoryFactory;
std::string directoryPath = cta::utils::getEnclosingPath(dfr.subrequestInfo.repackInfo.fileBufferURL);
std::unique_ptr<castor::tape::diskFile::Directory> directory;
try{
directory.reset(directoryFactory.createDirectory(directoryPath));
directory->rmdir();
log::ScopedParamContainer params(lc);
params.add("fileId", dfr.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", dfr.subrequestInfo.subrequest->getAddressIfSet())
.add("fileBufferURL", dfr.subrequestInfo.repackInfo.fileBufferURL);
lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): deleted the "+directoryPath+" directory");
} catch (const cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("fileId", dfr.subrequestInfo.archiveFile.archiveFileID)
.add("subrequestAddress", dfr.subrequestInfo.subrequest->getAddressIfSet())
.add("fileBufferURL", dfr.subrequestInfo.repackInfo.fileBufferURL)
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): failed to remove the "+directoryPath+" directory");
}
}
}
for (auto & jou: jobOwnerUpdatersList) {
try {
......
......@@ -595,23 +595,23 @@ void RadosStriperWriteFile::close() {
RadosStriperWriteFile::~RadosStriperWriteFile() throw() {}
//==============================================================================
// DiskFileRemover FACTORY
// AsyncDiskFileRemover FACTORY
//==============================================================================
DiskFileRemoverFactory::DiskFileRemoverFactory():
AsyncDiskFileRemoverFactory::AsyncDiskFileRemoverFactory():
m_URLLocalFile("^file://(.*)$"),
m_URLXrootdFile("^(root://.*)$"){}
DiskFileRemover * DiskFileRemoverFactory::createDiskFileRemover(const std::string &path){
AsyncDiskFileRemover * AsyncDiskFileRemoverFactory::createAsyncDiskFileRemover(const std::string &path){
// URL path parsing
std::vector<std::string> regexResult;
//local file URL?
regexResult = m_URLLocalFile.exec(path);
if(regexResult.size()){
return new LocalDiskFileRemover(regexResult[1]);
return new AsyncLocalDiskFileRemover(regexResult[1]);
}
regexResult = m_URLXrootdFile.exec(path);
if(regexResult.size()){
return new XRootdDiskFileRemover(path);
return new AsyncXRootdDiskFileRemover(path);
}
throw cta::exception::Exception("In DiskFileRemoverFactory::createDiskFileRemover: unknown type of URL");
}
......@@ -633,29 +633,59 @@ void LocalDiskFileRemover::remove(){
//==============================================================================
XRootdDiskFileRemover::XRootdDiskFileRemover(const std::string& path):m_xrootFileSystem(path){
m_URL = path;
m_truncatedFileURL = cta::utils::truncateXrootdPath(path);
m_truncatedFileURL = cta::utils::extractPathFromXrootdPath(path);
}
void XRootdDiskFileRemover::remove(){
//XrdCl::ResponseHandler response;
XrdCl::XRootDStatus statusRm = m_xrootFileSystem.Rm(m_truncatedFileURL,c_xrootTimeout);
/*XrdCl::AnyObject obj;
response.HandleResponse(&statusRm,&obj);*/
cta::exception::XrootCl::throwOnError(statusRm,"In XRootdDiskFileRemover::remove(), fail to remove file at "+m_URL);
cta::exception::XrootCl::throwOnError(statusRm,"In XRootdDiskFileRemover::remove(), fail to remove file.");;
}
void XRootdDiskFileRemover::removeAsync(AsyncXRootdDiskFileRemover::XRootdFileRemoverResponseHandler &responseHandler){
XrdCl::XRootDStatus statusRm = m_xrootFileSystem.Rm(m_truncatedFileURL,&responseHandler,c_xrootTimeout);
try{
cta::exception::XrootCl::throwOnError(statusRm,"In XRootdDiskFileRemover::remove(), fail to remove file.");
} catch(const cta::exception::Exception &e){
responseHandler.m_deletionPromise.set_exception(std::current_exception());
}
}
//==============================================================================
// AsyncDiskFileRemover
// AsyncXrootdDiskFileRemover
//==============================================================================
AsyncDiskFileRemover::AsyncDiskFileRemover(std::unique_ptr<DiskFileRemover> diskFileRemover):m_diskFileRemover(std::move(diskFileRemover)){
AsyncXRootdDiskFileRemover::AsyncXRootdDiskFileRemover(const std::string &path){
m_diskFileRemover.reset(new XRootdDiskFileRemover(path));
}
void AsyncDiskFileRemover::asyncDelete(){
void AsyncXRootdDiskFileRemover::asyncDelete(){
m_diskFileRemover->removeAsync(m_responseHandler);
}
void AsyncXRootdDiskFileRemover::wait(){
m_responseHandler.m_deletionPromise.get_future().get();
}
void AsyncXRootdDiskFileRemover::XRootdFileRemoverResponseHandler::HandleResponse(XrdCl::XRootDStatus* status, XrdCl::AnyObject* response){
try{
cta::exception::XrootCl::throwOnError(*status,"In XRootdDiskFileRemover::remove(), fail to remove file.");
m_deletionPromise.set_value();
} catch(const cta::exception::Exception &e){
m_deletionPromise.set_exception(std::current_exception());
}
}
//==============================================================================
// AsyncLocalDiskFileRemover
//==============================================================================
AsyncLocalDiskFileRemover::AsyncLocalDiskFileRemover(const std::string& path){
m_diskFileRemover.reset(new LocalDiskFileRemover(path));
}
void AsyncLocalDiskFileRemover::asyncDelete(){
m_futureDeletion = std::async(std::launch::async,[this](){m_diskFileRemover->remove();});
}
void AsyncDiskFileRemover::wait(){
void AsyncLocalDiskFileRemover::wait(){
m_futureDeletion.get();
}
......@@ -695,6 +725,11 @@ void LocalDirectory::mkdir(){
cta::exception::Errnum::throwOnMinusOne(retCode,"In LocalDirectory::mkdir(): failed to create directory at "+m_URL);
}
void LocalDirectory::rmdir(){
const int retcode = ::rmdir(m_URL.c_str());
cta::exception::Errnum::throwOnMinusOne(retcode,"In LocalDirectory::rmdir(): failed to remove the directory at "+m_URL);
}
bool LocalDirectory::exist(){
struct stat buffer;
return (stat(m_URL.c_str(), &buffer) == 0);
......@@ -721,7 +756,7 @@ std::set<std::string> LocalDirectory::getFilesName(){
//==============================================================================
XRootdDirectory::XRootdDirectory(const std::string& path):m_xrootFileSystem(path){
m_URL = path;
m_truncatedDirectoryURL = cta::utils::truncateXrootdPath(path);
m_truncatedDirectoryURL = cta::utils::extractPathFromXrootdPath(path);
}
void XRootdDirectory::mkdir() {
......@@ -729,6 +764,11 @@ void XRootdDirectory::mkdir() {
cta::exception::XrootCl::throwOnError(mkdirStatus,"In XRootdDirectory::mkdir() : failed to create directory at "+m_URL);
}
void XRootdDirectory::rmdir() {
XrdCl::XRootDStatus rmdirStatus = m_xrootFileSystem.RmDir(m_truncatedDirectoryURL, c_xrootTimeout);
cta::exception::XrootCl::throwOnError(rmdirStatus,"In XRootdDirectory::rmdir() : failed to remove directory at "+m_URL);
}
bool XRootdDirectory::exist() {
XrdCl::LocationInfo *locationDirectory;
XrdCl::XRootDStatus statStatus = m_xrootFileSystem.Locate(m_truncatedDirectoryURL,XrdCl::OpenFlags::Flags::Write,locationDirectory,c_xrootTimeout);
......@@ -741,6 +781,7 @@ bool XRootdDirectory::exist() {
std::set<std::string> XRootdDirectory::getFilesName(){
std::set<std::string> ret;
//TODO : Implement this method
return ret;
}
......
......@@ -151,14 +151,27 @@ namespace castor {
};
/**
* Factory class deciding which disk file remover
* to instanciate regarding the format of the path pass of the disk file
* This class is the base class to asynchronously delete
* Disk Files
*/
class DiskFileRemoverFactory {
class AsyncDiskFileRemover{
protected:
std::future<void> m_futureDeletion;
public:
virtual void asyncDelete() = 0;
virtual void wait() = 0;
virtual ~AsyncDiskFileRemover(){}
};
/**
* Factory class deciding which async disk file remover
* to instanciate regarding the format of the path of the disk file
*/
class AsyncDiskFileRemoverFactory {
typedef cta::utils::Regex Regex;
public:
DiskFileRemoverFactory();
DiskFileRemover * createDiskFileRemover(const std::string &path);
AsyncDiskFileRemoverFactory();
AsyncDiskFileRemover * createAsyncDiskFileRemover(const std::string &path);
private:
Regex m_URLLocalFile;
Regex m_URLXrootdFile;
......@@ -172,16 +185,6 @@ namespace castor {
std::string m_URL;
};
class AsyncDiskFileRemover{
public:
AsyncDiskFileRemover(std::unique_ptr<DiskFileRemover> diskFileRemover);
void asyncDelete();
void wait();
private:
std::future<void> m_futureDeletion;
std::unique_ptr<DiskFileRemover> m_diskFileRemover;
};
/**
* Factory class deciding what type of Directory subclass
* to instanciate based on the URL passed
......@@ -224,6 +227,11 @@ namespace castor {
*/
virtual std::set<std::string> getFilesName() = 0;
/**
* Remove the directory located at this->m_URL
*/
virtual void rmdir() = 0;
virtual ~Directory() throw() {}
protected:
/**
......
......@@ -38,6 +38,10 @@ namespace castor {
* Namespace managing the reading and writing of files to and from disk.
*/
namespace diskFile {
//Forward declarations
class XRootdDiskFileRemover;
//==============================================================================
// LOCAL FILES
//==============================================================================
......@@ -175,41 +179,104 @@ namespace castor {
size_t m_writePosition;
};
class LocalDiskFileRemover: public DiskFileRemover {
public:
LocalDiskFileRemover(const std::string& path);
void remove() override;
};
class XRootdDiskFileRemover: public DiskFileRemover{
public:
XRootdDiskFileRemover(const std::string &path);
void remove() override;
private:
XrdCl::FileSystem m_xrootFileSystem;
std::string m_truncatedFileURL; // root://.../ part of the path is removed
const uint16_t c_xrootTimeout = 15;
};
class LocalDirectory: public Directory {
public:
LocalDirectory(const std::string& path);
virtual void mkdir() override;
virtual bool exist() override;
virtual std::set<std::string> getFilesName() override;
};
class XRootdDirectory: public Directory{
public:
XRootdDirectory(const std::string& path);
virtual void mkdir() override;
virtual bool exist() override;
virtual std::set<std::string> getFilesName() override;
private:
XrdCl::FileSystem m_xrootFileSystem;
std::string m_truncatedDirectoryURL; // root://.../ part of the path is removed
const uint16_t c_xrootTimeout = 15;
};
//==============================================================================
// LocalDisk Removers
//==============================================================================
/**
* This class allows to delete a file from a local disk
*/
class LocalDiskFileRemover: public DiskFileRemover {
public:
/**
* Constructor of the LocalDiskFileRemover
* @param path the path of the file to be removed
*/
LocalDiskFileRemover(const std::string& path);
void remove() override;
};
/**
* This class allows to asynchronously delete a file from a local disk
*/
class AsyncLocalDiskFileRemover: public AsyncDiskFileRemover{
public:
/**
* Constructor of the asynchronous remover
* @param diskFileRemover the instance of the LocalDiskFileRemover that will delete the file
*/
AsyncLocalDiskFileRemover(const std::string &path);
void asyncDelete() override;
void wait() override;
private:
std::unique_ptr<LocalDiskFileRemover> m_diskFileRemover;
};
/**
* This class allows to asynchronously delete a file via XRootD
*/
class AsyncXRootdDiskFileRemover: public AsyncDiskFileRemover{
public:
friend class XRootdDiskFileRemover;
AsyncXRootdDiskFileRemover(const std::string &path);
void asyncDelete() override;
void wait() override;
private:
std::unique_ptr<XRootdDiskFileRemover> m_diskFileRemover;
class XRootdFileRemoverResponseHandler: public XrdCl::ResponseHandler{
public:
std::promise<void> m_deletionPromise;
public:
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override;
};
XRootdFileRemoverResponseHandler m_responseHandler;
};
/**
* This class allows to delete a file via XRootD (asynchronously or not)
*/
class XRootdDiskFileRemover: public DiskFileRemover{
public:
XRootdDiskFileRemover(const std::string &path);
/**
* Remove the file in a synchronous way
* @throws an exception if the XRootD call status is an Error
*/
void remove();
/**
* Remove the file in an asynchronous way
* @param responseHandler : the structure that will be updated when the asynchronous deletion is terminated
* (even if it fails)
* @throws an exception if the XRootD call status is an Error
*/
void removeAsync(AsyncXRootdDiskFileRemover::XRootdFileRemoverResponseHandler &responseHandler);
private:
XrdCl::FileSystem m_xrootFileSystem;
std::string m_truncatedFileURL; // root://.../ part of the path is removed
const uint16_t c_xrootTimeout = 15;
};
class LocalDirectory: public Directory {
public:
LocalDirectory(const std::string& path);
void mkdir() override;
bool exist() override;
std::set<std::string> getFilesName() override;
void rmdir() override;
};
class XRootdDirectory: public Directory{
public:
XRootdDirectory(const std::string& path);
void mkdir() override;
bool exist() override;
std::set<std::string> getFilesName() override;
void rmdir() override;
private:
XrdCl::FileSystem m_xrootFileSystem;
std::string m_truncatedDirectoryURL; // root://.../ part of the path is removed
const uint16_t c_xrootTimeout = 15;
};
} //end of namespace diskFile
}}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment