Commit c948dd43 authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Implemented ceph synchronization

parent 78aaa966
......@@ -302,7 +302,7 @@
# you can use a list like /your/mount/point/01/ /your/mount/point/02/ ...
#DiskManager MountPoints
# List of dataPools the DiskManager daemon should monitor
# List of dataPools the DiskManager daemon should monitor, garbage collect and synchronize
# you can use a list like pool1 pool2 ...
#DiskManager DataPools
......@@ -335,6 +335,13 @@
# stager catalog. The synchronization with the nameserver is not affected.
#GC DisableStagerSync no
# The period during which new files won't be considered for synchronization
# This protects in particular files being created (eg ongoing recalls) by giving
# time to the stager DB to create the associated DiskCopy. Otherwise, we would
# have a time window where the file exist on disk and can be considered by
# synchronization, while it does not exist on the stager. Thus it may be dropped
# Default is one day
#GC SyncGracePeriod 86400
## Security Configuration ######################################################
......
......@@ -29,7 +29,8 @@ include_directories(${RADOS_INCLUDE_DIR})
set(GCBIN_SRCS
GcDaemon.cpp
DeletionThread.cpp
SynchronizationThread.cpp)
SynchronizationThread.cpp
CephGlobals.cpp)
add_executable (gcd ${GCBIN_SRCS})
target_link_libraries (gcd castorcommon castordlf castorns castorclient ${RADOS_LIBS})
install (TARGETS gcd DESTINATION ${CASTOR_DEST_BIN_DIR})
......
/******************************************************************************
* CephGlobals.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* couple of global maps needed for interfacing with ceph libraries.
* Basically a cache of connections to ceph
*
* @author Dennis Waldron
*****************************************************************************/
#include "castor/gc/CephGlobals.hpp"
/// global variables holding ioCtx and stripers for each ceph pool
std::map<std::string, libradosstriper::RadosStriper*> g_radosStripers;
std::map<std::string, librados::IoCtx*> g_ioCtx;
librados::IoCtx* castor::gc::getRadosIoCtx(std::string pool) {
libradosstriper::RadosStriper* striper = castor::gc::getRadosStriper(pool);
if (0 == striper) return 0;
return g_ioCtx[pool];
}
libradosstriper::RadosStriper* castor::gc::getRadosStriper(std::string pool) {
std::map<std::string, libradosstriper::RadosStriper*>::iterator it =
g_radosStripers.find(pool);
if (it == g_radosStripers.end()) {
// we need to create a new radosStriper
librados::Rados cluster;
int rc = cluster.init(0);
if (rc) return 0;
rc = cluster.conf_read_file(NULL);
if (rc) {
cluster.shutdown();
return 0;
}
cluster.conf_parse_env(NULL);
rc = cluster.connect();
if (rc) {
cluster.shutdown();
return 0;
}
g_ioCtx[pool] = new librados::IoCtx();
librados::IoCtx *ioctx = g_ioCtx[pool];
rc = cluster.ioctx_create(pool.c_str(), *ioctx);
if (rc != 0) {
g_ioCtx.erase(pool);
return 0;
}
libradosstriper::RadosStriper *newStriper = new libradosstriper::RadosStriper;
rc = libradosstriper::RadosStriper::striper_create(*ioctx, newStriper);
if (rc != 0) {
delete newStriper;
g_ioCtx.erase(pool);
return 0;
}
it = g_radosStripers.insert(std::pair<std::string, libradosstriper::RadosStriper*>
(pool, newStriper)).first;
}
return it->second;
}
/******************************************************************************
* CephGlobals.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* couple of global maps needed for interfacing with ceph libraries.
* Basically a cache of connections to ceph
*
* @author castor dev team
*****************************************************************************/
#pragma once
#include "castor/gc/CephGlobals.hpp"
#include <rados/librados.hpp>
#include <radosstriper/libradosstriper.hpp>
namespace castor {
namespace gc {
/// gets an IoCtx object for a given pool
librados::IoCtx* getRadosIoCtx(std::string pool);
/// gets a Striper object for a given pool
libradosstriper::RadosStriper* getRadosStriper(std::string pool);
}
}
......@@ -25,6 +25,7 @@
// Include files
#include "castor/gc/DeletionThread.hpp"
#include "castor/gc/CephGlobals.hpp"
#include "castor/Services.hpp"
#include "castor/Constants.hpp"
#include "castor/stager/IGCSvc.hpp"
......@@ -274,40 +275,6 @@ void castor::gc::DeletionThread::run(void*) {
} // End of loop
}
/// global variables holding stripers for each ceph pool
std::map<std::string, libradosstriper::RadosStriper*> g_radosStripers;
static libradosstriper::RadosStriper* getRadosStriper(std::string pool) {
std::map<std::string, libradosstriper::RadosStriper*>::iterator it =
g_radosStripers.find(pool);
if (it == g_radosStripers.end()) {
// we need to create a new radosStriper
librados::Rados cluster;
int rc = cluster.init(0);
if (rc) return 0;
rc = cluster.conf_read_file(NULL);
if (rc) {
cluster.shutdown();
return 0;
}
cluster.conf_parse_env(NULL);
rc = cluster.connect();
if (rc) {
cluster.shutdown();
return 0;
}
librados::IoCtx ioctx;
rc = cluster.ioctx_create(pool.c_str(), ioctx);
if (rc != 0) return 0;
libradosstriper::RadosStriper *newStriper = new libradosstriper::RadosStriper;
rc = libradosstriper::RadosStriper::striper_create(ioctx, newStriper);
if (rc != 0) return 0;
it = g_radosStripers.insert(std::pair<std::string, libradosstriper::RadosStriper*>
(pool, newStriper)).first;
}
return it->second;
}
//-----------------------------------------------------------------------------
// gcRemoveFilePath
//-----------------------------------------------------------------------------
......
......@@ -140,7 +140,7 @@ castor::gc::GcDaemon::GcDaemon(std::ostream &stdOut, std::ostream &stdErr,
{ 23, "Unable to retrieve mountpoints, giving up with synchronization" },
{ 24, "Could not list filesystem directories, giving up with filesystem's synchronisation" },
{ 25, "Could not list filesystem subdirectory, ignoring it for synchronization" },
{ 27, "Deleting local file which is no longer in the nameserver" },
{ 27, "Deleting file which is no longer in the nameserver" },
{ 28, "Deletion of orphaned local file failed" },
{ 29, "Memory allocation failure" },
{ 30, "Synchronization configuration" },
......@@ -155,6 +155,10 @@ castor::gc::GcDaemon::GcDaemon(std::ostream &stdOut, std::ostream &stdErr,
{ 40, "Unexpected exception caught in synchronizeFiles" },
{ 41, "Failed to stat file" },
{ 43, "Could not get fileid from filepath, giving up for this file" },
{ 44, "Unable to get RadosStriper object. Ignoring file" },
{ 45, "Unable to retrieve IoCtx for DataPool" },
{ 46, "New synchronization grace period" },
{ 47, "Invalid GC/SyncGracePeriod option, using default" },
{ -1, "" }};
dlfInit(messages);
}
This diff is collapsed.
......@@ -100,59 +100,62 @@ namespace castor {
u_signed64 fileIdFromFilePath(std::string filePath);
/**
* gets a list of files open for write in the given mountPoint
* @param mountPoint the mountPoint to be considered
* @throw exception in case of error
* synchronizes all fileSystems
*/
std::set<std::string> getFilesBeingWrittenTo(const std::string &mountPoint);
void syncFileSystems();
/**
* synchronizes all dataPools
*/
void syncDataPools();
/**
* Synchronizes a given file
* @param mountPoint the mountPoint or DataPool concerned
* Synchronizes a given local file
* @param path the path where the file lies inside the mountPoint,
* or empty string for DataPools
* @param fileName the file name
* @param paths the differents "chunks", that is list of files, sorted by namespace
* @return whether synchronization took place
*/
bool syncFile(const std::string &mountPoint,
const std::string &path,
const char* fileName,
std::map<std::string, std::map<u_signed64, std::string> > &paths);
bool syncLocalFile(const std::string &path,
const char* fileName,
std::map<std::string, std::map<u_signed64, std::string> > &paths);
/**
* Synchronizes a given ceph file
* @param fileName the name of the file
* @param paths the differents "chunks", that is list of files, sorted by namespace
* @return whether synchronization took place
*/
bool syncCephFile(const std::string fileName,
std::map<std::string, std::map<u_signed64, std::string> > &paths);
/**
* Synchronizes all chunks present in paths
* @param mountPoint the mountPoint or DataPool concerned
* @param paths the differents "chunks", that is list of files, sorted by namespace
*/
void syncAllChunks(const std::string &mountPoint,
std::map<std::string, std::map<u_signed64, std::string> > &paths);
void syncAllChunks(std::map<std::string, std::map<u_signed64, std::string> > &paths);
/**
* Checks whether to synchronize a chunk for the given nameserver
* and does it if needed
* @param nameServer the nameServer concerned
* @param paths the differents "chunks", that is list of files, sorted by namespace
* @param mountPoint the mountPoint or DataPool concerned
* @param minimumNbFiles the minimumNbFiles we should have in the chunk to
* got for synchronization
* @return whether synchronization took place
*/
bool checkAndSyncChunk(const std::string &nameServer,
std::map<std::string, std::map<u_signed64, std::string> > &paths,
const std::string &mountPoint,
u_signed64 minimumNbFiles);
/**
* Synchronizes a list of files from a given filesystem with the nameserver and stager catalog
* @param nameServer the nameserver to use
* @param paths a map giving the full file name for each diskCopyId to be checked
* @param mountPoint the mountPoint of the filesystem on which the files reside
*/
void synchronizeFiles(const std::string &nameServer,
const std::map<u_signed64, std::string> &paths,
const std::string &mountPoint)
const std::map<u_signed64, std::string> &paths)
throw();
private:
......@@ -163,6 +166,9 @@ namespace castor {
unsigned int m_chunkInterval;
/// the size of a chunk, aka the maximum number of files to synchronize in one go
unsigned int m_chunkSize;
/// the grace period for new files. That is the period during which they are not
/// considered for synchronization
unsigned int m_gracePeriod;
/// Whether stager synchronization should be disabled;
bool m_disableStagerSync;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment