Commit 75cb8f14 authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Adapted synchronization to DataPools and ceph

parent b68ec1c9
......@@ -55,7 +55,53 @@
// Constructor
//-----------------------------------------------------------------------------
castor::gc::SynchronizationThread::SynchronizationThread(int startDelay) :
m_startDelay(startDelay) { };
m_startDelay(startDelay), m_chunkInterval(DEFAULT_CHUNKINTERVAL),
m_chunkSize(DEFAULT_CHUNKSIZE), m_disableStagerSync(DEFAULT_DISABLESTAGERSYNC)
{};
//-----------------------------------------------------------------------------
// syncFile
//-----------------------------------------------------------------------------
bool castor::gc::SynchronizationThread::syncFile
(const std::string &mountPoint,
const std::string &path,
const char* fileName,
std::map<std::string, std::map<u_signed64, std::string> > &paths) {
// Ignore non regular files and files closed too recently (< 1mn)
// This protects in particular recently recalled files by giving time
// to the stager DB to create the associated DiskCopy. Otherwise,
// we would have a time window where the file exist on disk and can
// be considered by us, while it does not exist on the stager. Thus
// we would drop it
struct stat64 filebuf;
std::string filepath (path + "/" + fileName);
if (stat64(filepath.c_str(), &filebuf) < 0) {
return false;
} else if (!(filebuf.st_mode & S_IFREG)) {
return false; // not a file
} else if (filebuf.st_mtime > time(NULL) - 600) {
return false;
}
// Extract the nameserver host and diskcopy id from the filename
std::pair<std::string, u_signed64> fid;
try {
fid = diskCopyIdFromFileName(fileName);
} catch (castor::exception::Exception& e) {
// "Ignoring filename that does not conform to castor naming
// conventions"
castor::dlf::Param params[] =
{castor::dlf::Param("Filename", fileName)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_DEBUG, 39, 1, params);
return false;
}
paths[fid.first][fid.second] = filepath;
// In the case of a large number of files, synchronize them in
// chunks so to not overwhelming central services
return checkAndSyncChunk(fid.first, paths, mountPoint, m_chunkSize);
}
//-----------------------------------------------------------------------------
// Run
......@@ -64,21 +110,17 @@ void castor::gc::SynchronizationThread::run(void*) {
// "Starting synchronization thread"
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_SYSTEM, 18);
sleep(m_startDelay);
// Get the synchronization interval and chunk size
unsigned int chunkInterval = DEFAULT_CHUNKINTERVAL;
unsigned int chunkSize = DEFAULT_CHUNKSIZE;
bool disableStagerSync = DEFAULT_DISABLESTAGERSYNC;
readConfigFile(&chunkInterval, &chunkSize, &disableStagerSync, true);
// sleep a bit if there is a startDelay
sleep(m_startDelay);
// Endless loop
for (;;) {
// Get the synchronization interval and chunk size these may have changed
// since the last iteration
readConfigFile(&chunkInterval, &chunkSize, &disableStagerSync);
if (chunkInterval <= 0) {
// Get the new configuration (eg. synch interval and chunk)
// as these may have changed since the last iteration
readConfigFile();
if (m_chunkInterval <= 0) {
// just do nothing if interval = 0
sleep(300);
return;
......@@ -90,7 +132,7 @@ void castor::gc::SynchronizationThread::run(void*) {
if (getconfent_multi("DiskManager", "MountPoints", 1, &fs, &nbFs) < 0) {
// "Unable to retrieve mountpoints, giving up with synchronization"
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, 23);
sleep(chunkInterval);
sleep(m_chunkInterval);
continue;
}
......@@ -111,7 +153,7 @@ void castor::gc::SynchronizationThread::run(void*) {
{castor::dlf::Param("FileSystem", fs[fsIt]),
castor::dlf::Param("Error", strerror(errno))};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, 24, 2, params);
sleep(chunkInterval);
sleep(m_chunkInterval);
continue;
}
struct dirent *dir;
......@@ -156,60 +198,8 @@ void castor::gc::SynchronizationThread::run(void*) {
// List files in this directory
struct dirent *file;
while ((file = readdir(files))) {
// Ignore non regular files and files closed too recently (< 1mn)
// This protects in particular recently recalled files by giving time
// to the stager DB to create the associated DiskCopy. Otherwise,
// we would have a time window where the file exist on disk and can
// be considered by us, while it does not exist on the stager. Thus
// we would drop it
struct stat64 filebuf;
std::string filepath (*it + "/" + file->d_name);
if (stat64(filepath.c_str(), &filebuf) < 0) {
continue;
} else if (!(filebuf.st_mode & S_IFREG)) {
continue; // not a file
} else if (filebuf.st_mtime > time(NULL) - 600) {
continue;
}
// Extract the nameserver host and diskcopy id from the filename
std::pair<std::string, u_signed64> fid;
try {
fid = diskCopyIdFromFileName(file->d_name);
} catch (castor::exception::Exception& e) {
// "Ignoring filename that does not conform to castor naming
// conventions"
castor::dlf::Param params[] =
{castor::dlf::Param("Filename", file->d_name)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_DEBUG, 39, 1, params);
continue;
}
paths[fid.first][fid.second] = *it + "/" + file->d_name;
// In the case of a large number of files, synchronize them in
// chunks so to not overwhelming central services
if (paths[fid.first].size() >= chunkSize) {
try {
// "Synchronizing files with nameserver and stager catalog"
castor::dlf::Param params[] =
{castor::dlf::Param("NbFiles", paths[fid.first].size()),
castor::dlf::Param("Nameserver", fid.first)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_DEBUG, 31, 2, params);
synchronizeFiles(fid.first, paths[fid.first], disableStagerSync, fs[fsIt]);
paths[fid.first].clear();
} catch (castor::exception::Exception& e) {
// "Unexpected exception caught in synchronizeFiles"
castor::dlf::Param params[] =
{castor::dlf::Param("ErrorCode", e.code()),
castor::dlf::Param("ErrorMessage", e.getMessage().str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, 40, 2, params);
}
// sleep a bit before next roudn to not overwhelm central services
sleep(chunkInterval);
bool didSync = syncFile(fs[fsIt], *it, file->d_name, paths);
if (didSync) {
// after we've slept, we should flush the hidden cache inside the readdir
// call. Otherwise, our next files will have as mtime the one before our
// sleep, and this means that the check on the age is useless
......@@ -217,33 +207,12 @@ void castor::gc::SynchronizationThread::run(void*) {
// repositioning of the dir stream to its current place does the trick
seekdir(files, telldir(files));
}
}
}
closedir(files);
}
// Synchronize the remaining files not yet checked for this filesystem
for (std::map<std::string, std::map<u_signed64, std::string> > ::const_iterator it2 =
paths.begin();
it2 != paths.end();
it2++) {
try {
if (it2->second.size() > 0) {
// "Synchronizing files with nameserver and stager catalog"
castor::dlf::Param params[] =
{castor::dlf::Param("NbFiles", it2->second.size()),
castor::dlf::Param("Nameserver", it2->first)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_DEBUG, 31, 2, params);
synchronizeFiles(it2->first, it2->second, disableStagerSync, fs[fsIt]);
paths[it2->first].clear();
sleep(chunkInterval);
}
} catch (castor::exception::Exception& e) {
// "Unexpected exception caught in synchronizeFiles"
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, 40, 0, 0);
sleep(chunkInterval);
}
}
syncAllChunks(fs[fsIt], paths);
// Go to next filesystem
free(fs[fsIt]);
......@@ -255,16 +224,55 @@ void castor::gc::SynchronizationThread::run(void*) {
}
}
//-----------------------------------------------------------------------------
// syncAllChunks
//-----------------------------------------------------------------------------
void castor::gc::SynchronizationThread::syncAllChunks
(const std::string &mountPoint,
std::map<std::string, std::map<u_signed64, std::string> > &paths) {
for (std::map<std::string, std::map<u_signed64, std::string> > ::const_iterator it =
paths.begin();
it != paths.end();
it++) {
checkAndSyncChunk(it->first, paths, mountPoint, 0);
}
}
//-----------------------------------------------------------------------------
// checkAndSyncChunk
//-----------------------------------------------------------------------------
bool castor::gc::SynchronizationThread::checkAndSyncChunk
(const std::string &nameServer,
std::map<std::string, std::map<u_signed64, std::string> > &paths,
const std::string &mountPoint,
u_signed64 minimumNbFiles) {
try {
if (paths[nameServer].size() >= minimumNbFiles) {
// "Synchronizing files with nameserver and stager catalog"
castor::dlf::Param params[] =
{castor::dlf::Param("NbFiles", paths[nameServer].size()),
castor::dlf::Param("Nameserver", nameServer)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_DEBUG, 31, 2, params);
synchronizeFiles(nameServer, paths[nameServer], mountPoint);
paths[nameServer].clear();
sleep(m_chunkInterval);
return true;
}
} catch (castor::exception::Exception& e) {
// "Unexpected exception caught in synchronizeFiles"
castor::dlf::Param params[] =
{castor::dlf::Param("ErrorCode", e.code()),
castor::dlf::Param("ErrorMessage", e.getMessage().str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, 40, 2, params);
sleep(m_chunkInterval);
}
return false;
}
//-----------------------------------------------------------------------------
// ReadConfigFile
//-----------------------------------------------------------------------------
void castor::gc::SynchronizationThread::readConfigFile
(unsigned int *chunkInterval,
unsigned int *chunkSize,
bool *disableStagerSync,
bool firstTime)
{
void castor::gc::SynchronizationThread::readConfigFile(bool firstTime) {
// Synchronization interval
char* value;
......@@ -275,20 +283,20 @@ void castor::gc::SynchronizationThread::readConfigFile
(value = getconfent("GC", "ChunkInterval", 0))) {
intervalnew = atoi(value);
if (intervalnew >= 0) {
if ((unsigned int)intervalnew != *chunkInterval) {
*chunkInterval = intervalnew;
if ((unsigned int)intervalnew != m_chunkInterval) {
m_chunkInterval = intervalnew;
if (!firstTime) {
// "New chunk interval"
castor::dlf::Param params[] =
{castor::dlf::Param("Interval", *chunkInterval)};
{castor::dlf::Param("Interval", m_chunkInterval)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_SYSTEM, 37, 1, params);
}
}
} else {
*chunkInterval = DEFAULT_CHUNKINTERVAL;
m_chunkInterval = DEFAULT_CHUNKINTERVAL;
// "Invalid GC/ChunkInterval option, using default"
castor::dlf::Param params[] =
{castor::dlf::Param("Default", *chunkInterval)};
{castor::dlf::Param("Default", m_chunkInterval)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, 38, 1, params);
}
}
......@@ -299,20 +307,20 @@ void castor::gc::SynchronizationThread::readConfigFile
(value = getconfent("GC", "ChunkSize", 0))) {
chunkSizenew = atoi(value);
if (chunkSizenew >= 0) {
if (*chunkSize != (unsigned int)chunkSizenew) {
*chunkSize = (unsigned int)chunkSizenew;
if (m_chunkSize != (unsigned int)chunkSizenew) {
m_chunkSize = (unsigned int)chunkSizenew;
if (!firstTime) {
// "New synchronization chunk size"
castor::dlf::Param params[] =
{castor::dlf::Param("ChunkSize", *chunkSize)};
{castor::dlf::Param("ChunkSize", m_chunkSize)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_SYSTEM, 22, 1, params);
}
}
} else {
*chunkSize = DEFAULT_CHUNKSIZE;
m_chunkSize = DEFAULT_CHUNKSIZE;
// "Invalid GC/ChunkSize option, using default"
castor::dlf::Param params[] =
{castor::dlf::Param("Default", *chunkSize)};
{castor::dlf::Param("Default", m_chunkSize)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, 20, 1, params);
}
}
......@@ -320,9 +328,9 @@ void castor::gc::SynchronizationThread::readConfigFile
// Disabling of stager synchronization
if ((value = getenv("GC_DISABLESTAGERSYNC")) ||
(value = getconfent("GC", "DisableStagerSync", 0))) {
*disableStagerSync = DEFAULT_DISABLESTAGERSYNC;
m_disableStagerSync = DEFAULT_DISABLESTAGERSYNC;
if (!strcasecmp(value, "yes")) {
*disableStagerSync = true;
m_disableStagerSync = true;
} else if (strcasecmp(value, "no")) {
castor::exception::Exception e(EINVAL);
e.getMessage() << "Invalid option for DisableStagerSync: '" << value
......@@ -335,8 +343,8 @@ void castor::gc::SynchronizationThread::readConfigFile
if (firstTime) {
// "Synchronization configuration"
castor::dlf::Param params[] =
{castor::dlf::Param("ChunkInterval", *chunkInterval),
castor::dlf::Param("ChunkSize", *chunkSize)};
{castor::dlf::Param("ChunkInterval", m_chunkInterval),
castor::dlf::Param("ChunkSize", m_chunkSize)};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_SYSTEM, 30, 2, params);
}
}
......@@ -432,7 +440,7 @@ castor::gc::SynchronizationThread::fileIdFromFilePath(std::string filePath)
// getFilesBeingWrittenTo
//-----------------------------------------------------------------------------
std::set<std::string>
castor::gc::SynchronizationThread::getFilesBeingWrittenTo(char* mountPoint)
castor::gc::SynchronizationThread::getFilesBeingWrittenTo(const std::string &mountPoint)
{
std::set<std::string> files;
// loop through the /proc/*/fd directories
......@@ -471,7 +479,7 @@ castor::gc::SynchronizationThread::getFilesBeingWrittenTo(char* mountPoint)
S_ISLNK(fileStat.st_mode) &&
((fileStat.st_mode & S_IWUSR) == S_IWUSR) &&
(readlink(filePath.str().c_str(), buffer, sizeof(buffer)) > 0)) {
if (strncmp(buffer, mountPoint, std::min(1024,int(strlen(mountPoint))))==0) {
if (strncmp(buffer, mountPoint.c_str(), std::min(1024,int(mountPoint.size())))==0) {
files.insert(std::string(buffer));
}
}
......@@ -486,10 +494,9 @@ castor::gc::SynchronizationThread::getFilesBeingWrittenTo(char* mountPoint)
// synchronizeFiles
//-----------------------------------------------------------------------------
void castor::gc::SynchronizationThread::synchronizeFiles
(std::string nameServer,
(const std::string &nameServer,
const std::map<u_signed64, std::string> &paths,
bool disableStagerSync,
char* mountPoint) throw() {
const std::string &mountPoint) throw() {
// Make a copy of the disk copy id and file path containers so that they can
// be modified safely
......@@ -553,7 +560,7 @@ void castor::gc::SynchronizationThread::synchronizeFiles
// The stager synchronization exists only to compensate from physical
// file losses on the diskservers or bugs: as such, we deliberately
// slow it down by a factor of 10 to not overwhelm the stager database.
if (!disableStagerSync && (rand()/(RAND_MAX + 1.0) < 0.1)) {
if (!m_disableStagerSync && (rand()/(RAND_MAX + 1.0) < 0.1)) {
orphans = gcSvc->stgFilesDeleted(dcIds, nameServer);
// Remove orphaned files
......
......@@ -75,18 +75,10 @@ namespace castor {
/**
* Read config file values
* @param chunkInterval a pointer to the chunk interval value
* @param chunkSize a pointer to the chunk size value
* @param disableStagerSync a pointer to the boolean commanding disabling
* of the synchronization with the stager
* @param firstTime whether this is a first call. used only for logging
* purposes
*/
void readConfigFile(unsigned int *chunkInterval,
unsigned int *chunkSize,
bool *disableStagerSync,
bool firstTime = false)
;
void readConfigFile(bool firstTime = false);
/**
* Parse a fileName and extract the diskCopyId
......@@ -96,8 +88,7 @@ namespace castor {
* syntax
*/
std::pair<std::string, u_signed64>
diskCopyIdFromFileName(std::string fileName)
;
diskCopyIdFromFileName(std::string fileName);
/**
* Parse a filePath and extract the fileId
......@@ -106,34 +97,74 @@ namespace castor {
* @throw exception in case the file name is not matching the expected
* syntax
*/
u_signed64 fileIdFromFilePath(std::string filePath)
;
u_signed64 fileIdFromFilePath(std::string filePath);
/**
* gets a list of files open for write in the given mountPoint
* @param mountPoint the mountPoint to be considered
* @throw exception in case of error
*/
std::set<std::string> getFilesBeingWrittenTo(char* mountPoint)
;
std::set<std::string> getFilesBeingWrittenTo(const std::string &mountPoint);
/**
* Synchronizes a given file
* @param mountPoint the mountPoint or DataPool concerned
* @param path the path where the file lies inside the mountPoint,
* or empty string for DataPools
* @param fileName the file name
* @param paths the differents "chunks", that is list of files, sorted by namespace
* @return whether synchronization took place
*/
bool syncFile(const std::string &mountPoint,
const std::string &path,
const char* fileName,
std::map<std::string, std::map<u_signed64, std::string> > &paths);
/**
* Synchronizes all chunks present in paths
* @param mountPoint the mountPoint or DataPool concerned
* @param paths the differents "chunks", that is list of files, sorted by namespace
*/
void syncAllChunks(const std::string &mountPoint,
std::map<std::string, std::map<u_signed64, std::string> > &paths);
/**
* Checks whether to synchronize a chunk for the given nameserver
* and does it if needed
* @param nameServer the nameServer concerned
* @param paths the differents "chunks", that is list of files, sorted by namespace
* @param mountPoint the mountPoint or DataPool concerned
* @param minimumNbFiles the minimumNbFiles we should have in the chunk to
* got for synchronization
* @return whether synchronization took place
*/
bool checkAndSyncChunk(const std::string &nameServer,
std::map<std::string, std::map<u_signed64, std::string> > &paths,
const std::string &mountPoint,
u_signed64 minimumNbFiles);
/**
* Synchronizes a list of files from a given filesystem with the nameserver and stager catalog
* @param nameServer the nameserver to use
* @param paths a map giving the full file name for each diskCopyId to be checked
* @param disableStagerSync whether to disable the stager synchronization
* @param mountPoint the mountPoint of the filesystem on which the files reside
*/
void synchronizeFiles(std::string nameServer,
void synchronizeFiles(const std::string &nameServer,
const std::map<u_signed64, std::string> &paths,
bool disableStagerSync,
char* mountPoint)
const std::string &mountPoint)
throw();
private:
/// The number of seconds to delay the first invocation of the run method
int m_startDelay;
/// The number of seconds to wait between two chunks' synchronization
unsigned int m_chunkInterval;
/// the size of a chunk, aka the maximum number of files to synchronize in one go
unsigned int m_chunkSize;
/// Whether stager synchronization should be disabled;
bool m_disableStagerSync;
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment