Commit 0e30e337 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Added the creation of the directory corresponding to the vid of the tape to...

Added the creation of the directory corresponding to the vid of the tape to repack (xroot + local file system)
parent 47cfe8b1
cmake_minimum_required (VERSION 2.6)
include_directories (${CMAKE_CURRENT_SOURCE_DIR}
${PROJECT_SOURCE_DIR} ${CMAKE_BINARY_DIR})
${PROJECT_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${XROOTD_INCLUDE_DIR})
set (CTA_SCHEDULER_SRC_FILES
ArchiveJob.cpp
......@@ -34,7 +34,7 @@ set_property(TARGET ctascheduler PROPERTY VERSION "${CTA_LIBVERSION}")
install (TARGETS ctascheduler DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
target_link_libraries (ctascheduler ctacommon ctaobjectstore ${PROTOBUF3_LIBRARIES} ctaeos)
target_link_libraries (ctascheduler ctacommon ctaobjectstore ${PROTOBUF3_LIBRARIES} ctaeos File)
#add_library (ctaschedulerutils SHARED
# _old_prototype_DummyScheduler.cpp)
......
......@@ -38,11 +38,17 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) {
if(repackRequest != nullptr){
//We have a RepackRequest that has the status ToExpand, expand it
timingList.insertAndReset("expandRepackRequestTime", t);
m_scheduler.expandRepackRequest(repackRequest,timingList,t,lc);
log::ScopedParamContainer params(lc);
params.add("tapeVid",repackRequest->getRepackInfo().vid);
timingList.addToLog(params);
lc.log(log::INFO, "In RepackRequestManager::runOnePass(): Repack Request expanded");
try{
m_scheduler.expandRepackRequest(repackRequest,timingList,t,lc);
log::ScopedParamContainer params(lc);
params.add("tapeVid",repackRequest->getRepackInfo().vid);
timingList.addToLog(params);
lc.log(log::INFO, "In RepackRequestManager::runOnePass(): Repack Request expanded");
} catch(const cta::exception::Exception &e){
lc.log(log::ERR,e.what());
repackRequest->fail();
throw(e);
}
}
}
......
......@@ -30,6 +30,8 @@
#include "common/make_unique.hpp"
#include "objectstore/RepackRequest.hpp"
#include "RetrieveRequestDump.hpp"
#include "tapeserver/castor/tape/tapeserver/file/DiskFileImplementations.hpp"
#include "tapeserver/castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include <iostream>
#include <sstream>
......@@ -409,15 +411,6 @@ std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() {
return nullptr;
}
//------------------------------------------------------------------------------
// generateRetrieveDstURL
//------------------------------------------------------------------------------
const std::string Scheduler::generateRetrieveDstURL(const cta::common::dataStructures::DiskFileInfo dfi) const{
std::ostringstream strStream;
strStream<<"repack:/"<<dfi.path;
return strStream.str();
}
//------------------------------------------------------------------------------
// expandRepackRequest
//------------------------------------------------------------------------------
......@@ -446,54 +439,82 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
}
uint64_t fSeq = repackRequest->m_dbReq->getLastExpandedFSeq() + 1;
cta::catalogue::ArchiveFileItor archiveFilesForCatalogue = m_catalogue.getArchiveFilesForRepackItor(repackInfo.vid, fSeq);
std::stringstream dirBufferURL;
dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/";
castor::tape::diskFile::DirectoryFactory dirFactory;
std::unique_ptr<castor::tape::diskFile::Directory> dir;
dir.reset(dirFactory.createDirectory(dirBufferURL.str()));
std::set<std::string> filesInDirectory;
if(dir->exist()){
filesInDirectory = dir->getFilesName();
} else {
dir->mkdir();
}
while(archiveFilesForCatalogue.hasMore()) {
size_t filesCount = 0;
uint64_t maxAddedFSeq = 0;
std::list<SchedulerDatabase::RepackRequest::Subrequest> retrieveSubrequests;
while(filesCount < c_defaultMaxNbFilesForRepack && archiveFilesForCatalogue.hasMore())
{
auto archiveFile = archiveFilesForCatalogue.next();
filesCount++;
fSeq++;
retrieveSubrequests.push_back(cta::SchedulerDatabase::RepackRequest::Subrequest());
auto & rsr = retrieveSubrequests.back();
rsr.archiveFile = archiveFile;
rsr.fSeq = std::numeric_limits<decltype(rsr.fSeq)>::max();
totalStatsFile.totalBytesToRetrieve += rsr.archiveFile.fileSize;
totalStatsFile.totalFilesToRetrieve += 1;
auto archiveFile = archiveFilesForCatalogue.next();
auto & retrieveSubRequest = retrieveSubrequests.back();
retrieveSubRequest.archiveFile = archiveFile;
retrieveSubRequest.fSeq = std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max();
// We have to determine which copynbs we want to rearchive, and under which fSeq we record this file.
if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::MoveOnly) {
// determine which fSeq(s) (normally only one) lives on this tape.
for (auto & tc: archiveFile.tapeFiles) if (tc.vid == repackInfo.vid) {
rsr.copyNbsToRearchive.insert(tc.copyNb);
retrieveSubRequest.copyNbsToRearchive.insert(tc.copyNb);
// We make the (reasonable) assumption that the archive file only has one copy on this tape.
// If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape.
// This will prevent double subrequest creation (we already have such a mechanism in case of crash and
// restart of expansion.
rsr.fSeq = std::min(tc.fSeq, rsr.fSeq);
maxAddedFSeq = std::max(maxAddedFSeq, rsr.fSeq);
retrieveSubRequest.fSeq = std::min(tc.fSeq, retrieveSubRequest.fSeq);
totalStatsFile.totalFilesToArchive += 1;
totalStatsFile.totalBytesToArchive += rsr.archiveFile.fileSize;
totalStatsFile.totalBytesToArchive += retrieveSubRequest.archiveFile.fileSize;
}
}
if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::AddCopiesOnly) {
// We should not get here are the type is filtered at the beginning of the function.
// TODO: add support for expand.
throw cta::exception::Exception("In Scheduler::expandRepackRequest(): expand not yet supported.");
std::stringstream fileName;
fileName << std::setw(9) << std::setfill('0') << retrieveSubRequest.fSeq;
bool createArchiveSubrequest = false;
if(filesInDirectory.count(fileName.str())){
castor::tape::file::RadosStriperPool radosStriperPool;
castor::tape::diskFile::DiskFileFactory fileFactory("",0,radosStriperPool);
castor::tape::diskFile::ReadFile *fileReader = fileFactory.createReadFile(dirBufferURL.str() + fileName.str());
if(fileReader->size() == archiveFile.fileSize){
createArchiveSubrequest = true;
retrieveSubrequests.pop_back();
//TODO : We don't want to retrieve the file again, create archive subrequest
}
}
if ((rsr.fSeq == std::numeric_limits<decltype(rsr.fSeq)>::max()) || rsr.copyNbsToRearchive.empty()) {
log::ScopedParamContainer params(lc);
params.add("fileId", rsr.archiveFile.archiveFileID)
.add("repackVid", repackInfo.vid);
lc.log(log::ERR, "In Scheduler::expandRepackRequest(): no fSeq found for this file on this tape.");
retrieveSubrequests.pop_back();
} else {
// We found some copies to rearchive. We still have to decide which file path we are going to use.
// File path will be base URL + /<VID>/<fSeq>
std::stringstream fileBufferURL;
fileBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/"
<< std::setw(9) << std::setfill('0') << rsr.fSeq;
rsr.fileBufferURL = fileBufferURL.str();
if(!createArchiveSubrequest){
totalStatsFile.totalBytesToRetrieve += retrieveSubRequest.archiveFile.fileSize;
totalStatsFile.totalFilesToRetrieve += 1;
if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::AddCopiesOnly) {
// We should not get here are the type is filtered at the beginning of the function.
// TODO: add support for expand.
throw cta::exception::Exception("In Scheduler::expandRepackRequest(): expand not yet supported.");
}
if ((retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) || retrieveSubRequest.copyNbsToRearchive.empty()) {
log::ScopedParamContainer params(lc);
params.add("fileId", retrieveSubRequest.archiveFile.archiveFileID)
.add("repackVid", repackInfo.vid);
lc.log(log::ERR, "In Scheduler::expandRepackRequest(): no fSeq found for this file on this tape.");
retrieveSubrequests.pop_back();
} else {
// We found some copies to rearchive. We still have to decide which file path we are going to use.
// File path will be base URL + /<VID>/<fSeq>
/*std::stringstream fileBufferURL;
fileBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/"
<< std::setw(9) << std::setfill('0') << rsr.fSeq;*/
maxAddedFSeq = std::max(maxAddedFSeq,retrieveSubRequest.fSeq);
retrieveSubRequest.fileBufferURL = dirBufferURL.str() + fileName.str();
}
}
}
// Note: the highest fSeq will be recorded internally in the following call.
......
......@@ -302,7 +302,6 @@ private:
std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList,
double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc);
const std::string generateRetrieveDstURL(const cta::common::dataStructures::DiskFileInfo dfi) const;
public:
/**
......
......@@ -38,6 +38,7 @@
#include "objectstore/JobQueueType.hpp"
#include "objectstore/RepackIndex.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include "tests/TempDirectory.hpp"
#include "common/Timer.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "objectstore/Algorithms.hpp"
......@@ -1242,7 +1243,7 @@ TEST_P(SchedulerTest, showqueues) {
TEST_P(SchedulerTest, repack) {
using namespace cta;
unitTests::TempDirectory tempDirectory;
setupDefaultCatalogue();
Scheduler &scheduler = getScheduler();
......@@ -1256,7 +1257,7 @@ TEST_P(SchedulerTest, repack) {
// Create and then cancel repack
common::dataStructures::SecurityIdentity cliId;
std::string tape1 = "Tape";
scheduler.queueRepack(cliId, tape1, "root://server/repackDir", common::dataStructures::RepackInfo::Type::MoveOnly, lc);
scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc);
{
auto repacks = scheduler.getRepacks();
ASSERT_EQ(1, repacks.size());
......@@ -1266,7 +1267,7 @@ TEST_P(SchedulerTest, repack) {
scheduler.cancelRepack(cliId, tape1, lc);
ASSERT_EQ(0, scheduler.getRepacks().size());
// Recreate a repack and get it moved to ToExpand
scheduler.queueRepack(cliId, "Tape2", "root://server/repackDir", common::dataStructures::RepackInfo::Type::MoveOnly, lc);
scheduler.queueRepack(cliId, "Tape2", "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc);
{
auto repacks = scheduler.getRepacks();
ASSERT_EQ(1, repacks.size());
......@@ -1283,6 +1284,7 @@ TEST_P(SchedulerTest, repack) {
TEST_P(SchedulerTest, getNextRepackRequestToExpand) {
using namespace cta;
unitTests::TempDirectory tempDirectory;
setupDefaultCatalogue();
......@@ -1294,10 +1296,10 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) {
// Create a repack request
common::dataStructures::SecurityIdentity cliId;
std::string tape1 = "Tape";
scheduler.queueRepack(cliId, tape1, "root://server/repackDir", common::dataStructures::RepackInfo::Type::MoveOnly, lc);
scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc);
std::string tape2 = "Tape2";
scheduler.queueRepack(cliId,tape2,"root://server/repackDir",common::dataStructures::RepackInfo::Type::AddCopiesOnly,lc);
scheduler.queueRepack(cliId,tape2,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,lc);
//Test the repack request queued has status Pending
ASSERT_EQ(scheduler.getRepack(tape1).status,common::dataStructures::RepackInfo::Status::Pending);
......@@ -1329,6 +1331,7 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) {
TEST_P(SchedulerTest, expandRepackRequest) {
using namespace cta;
unitTests::TempDirectory tempDirectory;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
......@@ -1431,7 +1434,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
for(uint64_t i = 0; i < nbTapesToRepack ; ++i) {
scheduler.queueRepack(admin,allVid.at(i),"root://repackData/buffer",common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,allVid.at(i),"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
}
scheduler.waitSchedulerDbSubthreadsComplete();
//scheduler.waitSchedulerDbSubthreadsComplete();
......@@ -1468,7 +1471,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
ASSERT_EQ(retrieveJob.request.archiveFileID,archiveFileId++);
ASSERT_EQ(retrieveJob.fileSize,compressedFileSize);
std::stringstream ss;
ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j;
ss<<"file://"<<tempDirectory.path()<<"/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j;
ASSERT_EQ(retrieveJob.request.dstURL, ss.str());
ASSERT_EQ(retrieveJob.tapeCopies[vid].second.copyNb,1);
ASSERT_EQ(retrieveJob.tapeCopies[vid].second.checksumType,checksumType);
......@@ -1569,7 +1572,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
//Testing scheduler retrieve request
ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId++);
std::stringstream ss;
ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j;
ss<<"file://"<<tempDirectory.path()<<"/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j;
ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str());
// TODO ASSERT_EQ(schedulerRetrieveRequest.isRepack,true);
// TODO ASSERT_EQ(schedulerRetrieveRequest.tapePool,s_tapePoolName);
......@@ -1642,7 +1645,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
ASSERT_EQ(archiveFile.fileSize,archiveFileSize);
ASSERT_EQ(archiveFile.storageClass,s_storageClassName);
std::stringstream ss;
ss<<"root://repackData/buffer/"<<allVid.at(tapeIndex-1)<<"/"<<std::setw(9)<<std::setfill('0')<<fileIndex;
ss<<"file://"<<tempDirectory.path()<<"/"<<allVid.at(tapeIndex-1)<<"/"<<std::setw(9)<<std::setfill('0')<<fileIndex;
ASSERT_EQ(ar.getSrcURL(),ss.str());
for(auto archiveJob : ar.dumpJobs()){
ASSERT_EQ(archiveJob.status,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForRepack);
......@@ -1662,7 +1665,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) {
using namespace cta;
using namespace cta::objectstore;
unitTests::TempDirectory tempDirectory;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
auto &schedulerDB = getSchedulerDB();
......@@ -1755,9 +1758,8 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"root://repackData/buffer",common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.waitSchedulerDbSubthreadsComplete();
//scheduler.waitSchedulerDbSubthreadsComplete();
log::TimingList tl;
utils::Timer t;
......@@ -1903,7 +1905,7 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) {
TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) {
using namespace cta;
using namespace cta::objectstore;
unitTests::TempDirectory tempDirectory;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
auto &schedulerDB = getSchedulerDB();
......@@ -1995,7 +1997,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"root://repackData/buffer",common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.waitSchedulerDbSubthreadsComplete();
//scheduler.waitSchedulerDbSubthreadsComplete();
......@@ -2149,7 +2151,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) {
TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) {
using namespace cta;
using namespace cta::objectstore;
unitTests::TempDirectory tempDirectory;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
auto &schedulerDB = getSchedulerDB();
......@@ -2241,7 +2243,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) {
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"root://repackData/buffer",common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.waitSchedulerDbSubthreadsComplete();
log::TimingList tl;
......
......@@ -62,7 +62,7 @@ target_link_libraries(BasicReadWriteTest
add_library(File
${TAPESERVER_FILE_LIBRARY_SRCS})
target_link_libraries (File XrdCl cryptopp radosstriper)
target_link_libraries (File XrdCl XrdClient cryptopp radosstriper)
if(CMAKE_COMPILER_IS_GNUCC)
if(GCC_VERSION_GE_4_8_0)
......
......@@ -21,8 +21,9 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include <sys/types.h>
#include <sys/stat.h>
#include <xrootd/XrdClient/XrdClientUrlInfo.hh>
#include "castor/tape/tapeserver/file/DiskFile.hpp"
#include "castor/tape/tapeserver/file/DiskFileImplementations.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/exception/Errnum.hpp"
......@@ -593,4 +594,95 @@ void RadosStriperWriteFile::close() {
RadosStriperWriteFile::~RadosStriperWriteFile() throw() {}
//==============================================================================
// DIRECTORY FACTORY
//==============================================================================
DirectoryFactory::DirectoryFactory():
m_URLLocalDirectory("^file://(.*)$"),
m_URLXrootDirectory("^(root://.*)$"){}
Directory * DirectoryFactory::createDirectory(const std::string& path){
// URL path parsing
std::vector<std::string> regexResult;
// local file URL?
regexResult = m_URLLocalDirectory.exec(path);
if (regexResult.size()) {
return new LocalDirectory(regexResult[1]);
}
// Xroot URL?
regexResult = m_URLXrootDirectory.exec(path);
if (regexResult.size()) {
return new XRootdDirectory(path);
}
throw cta::exception::Exception("In DirectoryFactory::createDirectory: unknown type of URL");
}
//==============================================================================
// LOCAL DIRECTORY
//==============================================================================
LocalDirectory::LocalDirectory(const std::string& path){
m_URL = path;
}
void LocalDirectory::mkdir(){
const int retCode = ::mkdir(m_URL.c_str(),S_IRWXU);
cta::exception::Errnum::throwOnMinusOne(retCode,"In LocalDirectory::mkdir(): failed to create directory at "+m_URL);
}
bool LocalDirectory::exist(){
struct stat buffer;
return (stat(m_URL.c_str(), &buffer) == 0);
}
std::set<std::string> LocalDirectory::getFilesName(){
std::set<std::string> names;
DIR *dir;
struct dirent *file;
dir = opendir(m_URL.c_str());
cta::exception::Errnum::throwOnNull(dir,"In LocalDirectory::getFilesName, failed to open directory at "+m_URL);
while((file = readdir(dir)) != NULL){
char *fileName = file->d_name;
if(strcmp(fileName,".") && strcmp(fileName,"..")){
names.insert(std::string(file->d_name));
}
}
cta::exception::Errnum::throwOnMinusOne(::closedir(dir),"In LocalDirectory::getFilesName(), fail to close directory at "+m_URL);
return names;
}
//==============================================================================
// XROOT DIRECTORY
//==============================================================================
XRootdDirectory::XRootdDirectory(const std::string& path):m_xrootFileSystem(path){
m_URL = path;
m_truncatedDirectoryURL = this->truncatePath(path);
}
std::string XRootdDirectory::truncatePath(const std::string &path) {
XrdClientUrlInfo urlInfo(path.c_str());
return std::string(urlInfo.File.c_str());
}
void XRootdDirectory::mkdir() {
XrdCl::XRootDStatus mkdirStatus = m_xrootFileSystem.MkDir(m_truncatedDirectoryURL,XrdCl::MkDirFlags::None,XrdCl::Access::Mode::UR | XrdCl::Access::Mode::UW | XrdCl::Access::Mode::UX,c_xrootTimeout);
cta::exception::XrootCl::throwOnError(mkdirStatus,"In XRootdDirectory::mkdir() : failed to create directory at "+m_URL);
}
bool XRootdDirectory::exist() {
XrdCl::LocationInfo *locationDirectory;
XrdCl::XRootDStatus statStatus = m_xrootFileSystem.Locate(m_truncatedDirectoryURL,XrdCl::OpenFlags::Flags::Write,locationDirectory,c_xrootTimeout);
cta::exception::XrootCl::throwOnError(statStatus,"In XrootdDirectory::exist(): fail to determine if directory exists.");
if(locationDirectory->GetSize() != 0){
return true;
}
return false;
}
std::set<std::string> XRootdDirectory::getFilesName(){
std::set<std::string> ret;
return ret;
}
}}} //end of namespace diskFile
......@@ -28,6 +28,7 @@
#include <cryptopp/rsa.h>
#include <memory>
#include <stdint.h>
#include <set>
/*
* This file only contains the interface declaration of the base classes
* the real implementation, which depends on many includes is hidden in
......@@ -47,6 +48,7 @@ namespace castor {
class ReadFile;
class WriteFile;
class Directory;
/**
* Factory class deciding on the type of read/write file type
......@@ -146,6 +148,56 @@ namespace castor {
std::string m_URL;
};
/**
* Factory class deciding what type of Directory subclass
* to instanciate based on the URL passed
*/
class DirectoryFactory{
typedef cta::utils::Regex Regex;
public:
DirectoryFactory();
/**
* Returns the correct directory subclass regarding the path passed in parameter
* @param path the path of the directory to manage
* @return the Directory subclass instance regarding the path passed in parameter
* @throws cta::exception if the path provided does not allow to determine which instance of
* Directory will be instanciated.
*/
Directory * createDirectory(const std::string &path);
private:
Regex m_URLLocalDirectory;
Regex m_URLXrootDirectory;
};
class Directory {
public:
/**
* Creates a directory
* @throws an exception if the directory could not have been created
*/
virtual void mkdir() = 0;
/**
* Check if the directory exist
* @return true if the directory exists, false otherwise
*/
virtual bool exist() = 0;
/**
* Return all the names of the files present in the directory
* @return
*/
virtual std::set<std::string> getFilesName() = 0;
virtual ~Directory() throw() {}
protected:
/**
* Storage for the URL
*/
std::string m_URL;
};
} //end of namespace diskFile
}}
......
......@@ -23,9 +23,9 @@
#pragma once
#include "castor/tape/tapeserver/file/DiskFile.hpp"
#include "castor/tape/tapeserver/file/Structures.hpp"
#include "castor/tape/tapeserver/daemon/VolumeInfo.hpp"
#include "tapeserver/castor/tape/tapeserver/file/DiskFile.hpp"
#include "tapeserver/castor/tape/tapeserver/file/Structures.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/VolumeInfo.hpp"
#include "common/exception/XrootCl.hpp"
#include "common/exception/Exception.hpp"
#include <xrootd/XrdCl/XrdClFile.hh>
......@@ -174,6 +174,33 @@ namespace castor {
std::string m_osd;
size_t m_writePosition;
};
} //end of namespace diskFile
class LocalDirectory: public Directory {
public:
LocalDirectory(const std::string& path);
virtual void mkdir() override;
virtual bool exist() override;
virtual std::set<std::string> getFilesName() override;
};
class XRootdDirectory: public Directory{
public:
XRootdDirectory(const std::string& path);
virtual void mkdir() override;
virtual bool exist() override;
virtual std::set<std::string> getFilesName() override;
private:
/**
* Remove the root://.../ from the path passed in parameter
* @param path the path to truncate
* @return the truncated path
*/
std::string truncatePath(const std::string &path);
XrdCl::FileSystem m_xrootFileSystem;
std::string m_truncatedDirectoryURL; // root://.../ part of the path is removed
const uint16_t c_xrootTimeout = 15;
};
} //end of namespace diskFile
}}
......@@ -32,6 +32,7 @@
#include "common/exception/Exception.hpp"
#include "scheduler/ArchiveJob.hpp"
#include "scheduler/RetrieveJob.hpp"
#include "DiskFileImplementations.hpp"
#include <gtest/gtest.h>
#include <memory>
......@@ -302,4 +303,40 @@ namespace unitTests {
ASSERT_EQ(strncmp(data1, data2, res1), 0);
} while(res1 || res2);
}
TEST(ctaDirectoryTests, directoryExist) {
castor::tape::diskFile::LocalDirectory dir("/tmp/");
ASSERT_TRUE(dir.exist());
castor::tape::diskFile::LocalDirectory dirNotExist("/AZERTY/");
ASSERT_FALSE(dirNotExist.exist());