diff --git a/CMakeLists.txt b/CMakeLists.txt index 6a99f6b4df5008e9d9d6734ecb6dae6dd396ba10..b4565bf9925aa57455c2a4d0adcc99ca326fefff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -223,6 +223,8 @@ add_custom_target(helgrind add_custom_target(shortunittests tests/cta-unitTests COMMAND tests/cta-unitTests-multiProcess + COMMAND echo ${PROJECT_SOURCE_DIR}/python/eosfstgcd/test_ctafstgcd.py + COMMAND ${PROJECT_SOURCE_DIR}/python/eosfstgcd/test_ctafstgcd.py DEPENDS tests/cta-unitTests tests/cta-unitTests-multiProcess COMMENT "Running unit tests" VERBATIM) diff --git a/python/eosfstgcd/.gitignore b/python/eosfstgcd/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..0d20b6487c61e7d1bde93acf4a14b7a89083a16d --- /dev/null +++ b/python/eosfstgcd/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/python/eosfstgcd/CMakeLists.txt b/python/eosfstgcd/CMakeLists.txt index e85e5ddbae2e9c8af3cff719664fff5545f27b7a..781060864fd6a9b48dcf2246171619f663dddf38 100644 --- a/python/eosfstgcd/CMakeLists.txt +++ b/python/eosfstgcd/CMakeLists.txt @@ -15,7 +15,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. cmake_minimum_required (VERSION 2.6) -install (PROGRAMS cta-fst-gcd DESTINATION usr/bin) +install (PROGRAMS ctafstgcd.py DESTINATION usr/bin/cta-fst-gcd) install (FILES ${CMAKE_CURRENT_SOURCE_DIR}/cta-fst-gcd.1cta DESTINATION /usr/share/man/man1) install (FILES cta-fst-gcd.service DESTINATION /etc/systemd/system) install (FILES cta-fst-gcd.conf.example DESTINATION /etc/cta) diff --git a/python/eosfstgcd/cta-fst-gcd b/python/eosfstgcd/cta-fst-gcd deleted file mode 100755 index 358a983c12c62b545a02a36194028946e9db0f19..0000000000000000000000000000000000000000 --- a/python/eosfstgcd/cta-fst-gcd +++ /dev/null @@ -1,319 +0,0 @@ -#!/bin/python - -# The CERN Tape Archive (CTA) project -# Copyright (C) 2015 CERN -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import argparse -import ConfigParser -import datetime -import getpass -import logging -import logging.config -import os -import re -import socket -import subprocess -import sys -import time - -class UserError(Exception): - pass - -class Gc: - '''Implements the cta-fst-gcd daemon that runs on an EOS FST and garbage - collects EOS disk copies that have been safely stored to tape. - - The cta-fst-gcd daemon scans across every single EOS disk file on an FST. - A file is garbage collected if: - - * The amount of free space on the corresponding file system is - considered too low. - - * The file is considered old enough to be garbage collected. - - The cta-fst-gcd daemon garbage collects an EOS disk file by extracting the - hexadecimal EOS file identifier from the local disk filename and then - running eos stagerm fxid:<fid-hex>.''' - - def get_env_mgmhost(self): - logger = logging.getLogger('gc') - - if "EOS_MGM_URL" in self.env: - mgm_url = self.env["EOS_MGM_URL"] - mgmhost = re.sub("^x?root://", "", mgm_url) - logger.info("Determined MGM host {} from EOS_MGM_URL".format(mgmhost)) - return mgmhost - - logger.info("The environment variable EOS_MGM_URL is not set") - - def get_syconfig_mgmhost(self): - logger = logging.getLogger('gc') - eos_env = '/etc/sysconfig/eos_env' - - if os.path.isfile(eos_env) and os.access(eos_env, os.W_OK): - sysconfig_file = open(eos_env, "r") - for line in sysconfig_file: - mgmhostline = re.match("^EOS_MGM_HOST=.*", line) - if mgmhostline: - splitmgmhostline = mgmhostline.group(0).split('=') - if 2 == len(splitmgmhostline): - mgmhost = splitmgmhostline[1] - logger.info("Extracted MGM host {} from {}".format(mgmhost, eos_env)) - return mgmhost - - logger.info("Could not determine MGM host from {}".format(eos_env)) - - def setmgmhost(self): - self.mgmhost = self.get_env_mgmhost() - if self.mgmhost: - return - - self.mgmhost = self.get_syconfig_mgmhost() - if self.mgmhost: - return - - raise UserError("Could not determine the MGM host from EOS_MGM_URL or from /etc/sysconfig/eos_env") - - def configuredummylogging(self): - config = { - 'version': 1, - 'disable_existing_loggers': False, - 'loggers': { - 'gc' : { - 'level': 'INFO' - } - } - } - logging.config.dictConfig(config) - - def configurereallogging(self): - if None == self.logfile: - raise Exception("Cannot configure file based logging because the log file path has not been set") - - loggingdir = os.path.dirname(self.logfile) - if not os.path.isdir(loggingdir): - raise UserError("The logging directory {} is not a directory or does not exist".format(loggingdir)) - if not os.access(loggingdir, os.W_OK): - raise UserError("The logging directory {} cannot be written to by {}".format(loggingdir, self.programname)) - - config = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'stdout': { - 'format': '%(asctime)s.%(msecs)03d000 %(levelname)s ' + self.programname + - ': LVL="%(levelname)s" PID="%(process)d" TID="%(process)d" MSG="%(message)s"', - 'datefmt': '%Y/%m/%d %H:%M:%S' - } - }, - 'handlers': { - 'logfile': { - 'level': 'INFO', - 'formatter': 'stdout', - 'class': 'logging.handlers.TimedRotatingFileHandler', - 'filename' : self.logfile, - 'when' : 'midnight' - } - }, - 'loggers': { - 'gc' : { - 'handlers': ['logfile'], - 'level': 'INFO' - } - } - } - # Failing to configure the logging system is usually a user error - try: - logging.config.dictConfig(config) - except Exception as err: - raise UserError(err) - - def configurelogging(self): - if None == self.logfile: - self.configuredummylogging() - else: - self.configurereallogging() - - def __init__(self, env, logfile): - self.programname = 'cta-fst-gcd' - self.env = env - self.conffilepath = '/etc/cta/cta-fst-gcd.conf' - self.logfile = logfile - self.fqdn = socket.getfqdn() - self.localfilesystempaths = [] - self.nbfilesconsideredsincelastreport = 0 - self.nbfilesbeforereport = 10000 - self.mgmhost = None - - def eosfsls(self): - logger = logging.getLogger('gc') - mgmurl = "root://{}".format(self.mgmhost) - cmd = "eos -r 0 0 {} fs ls -m".format(mgmurl) - env = os.environ.copy() - env["XrdSecPROTOCOL"] = "sss" - process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) - stdout,stderr = process.communicate() - - result = [] - - if 0 != process.returncode: - returncodestr = os.strerror(process.returncode) - stderrstr = stderr.replace('\n', ' ').replace('\r', '').strip() - logger.error("Failed to execute {}: returncode={} returncodestr='{}' stderr='{}'" - .format(cmd, process.returncode, returncodestr, stderrstr)) - else: - lines = stdout.splitlines(); - for l in lines: - linedict = {} - pairs = l.split() - for p in pairs: - splitpair = p.split('=') - if 2 == len(splitpair): - linedict[splitpair[0]] = splitpair[1] - if linedict: - result.append(linedict) - - return result - - def eosstagerrm(self, fxid, subdir, freebytes): - logger = logging.getLogger('gc') - mgmurl = "root://{}".format(self.mgmhost) - cmd = "eos {} stagerrm fxid:{}".format(mgmurl, fxid) - env = os.environ.copy() - env["XrdSecPROTOCOL"] = "sss" - process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) - stdout,stderr = process.communicate() - if 0 == process.returncode: - logger.info("minfreebytes={} subdir={} freebytesbefore={} executed='{}'".format(self.minfreebytes, subdir, freebytes, cmd)) - - def processfile(self, subdir, fstfile): - statvfs = os.statvfs(subdir) - freebytes = statvfs.f_frsize * statvfs.f_bavail - spaceshouldefreed = freebytes < self.minfreebytes - - if spaceshouldefreed: - fullpath = os.path.join(subdir,fstfile) - statinfo = os.stat(fullpath) - now = time.time() - agesecs = now - statinfo.st_ctime - if agesecs > self.gcagesecs: - self.eosstagerrm(fstfile, subdir, freebytes) - - self.nbfilesconsideredsincelastreport = self.nbfilesconsideredsincelastreport + 1 - if self.nbfilesbeforereport == self.nbfilesconsideredsincelastreport: - self.nbfilesconsideredsincelastreport = 0 - logger = logging.getLogger('gc') - logger.info('Considered another {} files'.format(self.nbfilesbeforereport)) - - def processfssubdir(self, subdir): - logger = logging.getLogger('gc') - fstfiles = [f for f in os.listdir(subdir) - if re.match('^[0-9A-Fa-f]{8}$', f) and os.path.isfile(os.path.join(subdir, f))] - for fstfile in fstfiles: - self.processfile(subdir, fstfile) - - def processfs(self, path): - fssubdirs = [os.path.join(path, f) for f in os.listdir(path) - if re.match('^[0-9A-Fa-f]{8}$', f) and os.path.isdir(os.path.join(path, f))] - for fssubdir in fssubdirs: - self.processfssubdir(fssubdir) - - def logfilesystempaths(self): - logger = logging.getLogger('gc') - logger.info('Number of local file systems is {}'.format(len(self.localfilesystempaths))) - i = 0 - for path in self.localfilesystempaths: - statvfs = os.statvfs(path) - freebytes = statvfs.f_frsize * statvfs.f_bavail - logger.info('Local file system {}: path={} freebytes={}'.format(i, path, freebytes)) - i = i + 1 - - def processallfs(self): - filesystems = self.eosfsls() - newlocalfilesystempaths = [fs["path"] for fs in filesystems if "path" in fs and "host" in fs and self.fqdn == fs["host"]] - if newlocalfilesystempaths != self.localfilesystempaths: - self.localfilesystempaths = newlocalfilesystempaths - self.logfilesystempaths(); - for path in self.localfilesystempaths: - self.processfs(path) - - def logconf(self): - logger = logging.getLogger('gc') - if self.mgmhost: - logger.info("config mgmhost={}".format(self.mgmhost)) - logger.info("config minfreebytes={}".format(self.minfreebytes)) - logger.info("config gcagesecs={}".format(self.gcagesecs)) - - def readconf(self): - if not os.path.isfile(self.conffilepath): - raise UserError("The configuration file {} is not a directory or does not exist".format(self.conffilepath)) - if not os.access(self.conffilepath, os.R_OK): - raise UserError("The configuration file {} cannot be read by {}".format(self.conffilepath, self.programname)) - - config = ConfigParser.ConfigParser() - config.read(self.conffilepath) - - try: - try: - self.mgmhost = config.get('main', 'mgmhost') - except ConfigParser.NoOptionError: - pass - self.minfreebytes = config.getint('main', 'minfreebytes') - self.gcagesecs = config.getint('main', 'gcagesecs') - except ConfigParser.Error as err: - raise UserError("Error with configuration file {}: {}".format(self.conffilepath, err)) - - def run(self): - username = getpass.getuser() - if 'daemon' != username: - raise UserError('{} must be executed as user daemon and not user {}'.format(self.programname, username)) - - self.configurelogging() - logger = logging.getLogger('gc') - logger.info('{} started'.format(self.programname)) - logger.info('The fqdn of this machine is {}'.format(self.fqdn)) - self.readconf() - self.logconf() - if not self.mgmhost: - self.setmgmhost() - logger.info('The EOS MGM host is {}'.format(self.mgmhost)) - - minperiod = 300 # In seconds - - while True: - before = time.time() - self.processallfs() - after = time.time() - period = after - before - if period < minperiod: - sleeptime = minperiod - period - logger.debug('Sleeping {} seconds'.format(sleeptime)) - time.sleep(sleeptime) - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-l", "--logfile", default="/var/log/eos/fst/cta-fst-gcd.log", help="Log file path") - args = parser.parse_args() - - gc = Gc(env=os.environ, logfile=args.logfile) - - try: - gc.run() - except UserError as err: - print "User error: {}".format(err) - -if __name__ == '__main__': - main() diff --git a/python/eosfstgcd/cta-fst-gcd.conf.example b/python/eosfstgcd/cta-fst-gcd.conf.example index 74a32b0a6e52bb36055c5ace4e8a99ea5dc753d9..eab5852edbbb8c814c5a19aa63667cc9f18edaaf 100644 --- a/python/eosfstgcd/cta-fst-gcd.conf.example +++ b/python/eosfstgcd/cta-fst-gcd.conf.example @@ -23,6 +23,10 @@ # There must always be a main section [main] -mgmhost=HOSTNAME.2NDLEVEL.TOPLEVEL +logfile = /var/log/eos/fst/cta-fst-gcd.log ; Path of garbage collector log file +mgmhost = HOSTNAME.2NDLEVEL.TOPLEVEL ; Fully qualified host name of EOS MGM minfreebytes = 10000000000 ; Minimum number of free bytes a filesystem should have -gcagesecs = 7200 ; The age at which a file can be considered for garbage collection +gcagesecs = 7200 ; Age at which a file can be considered for garbage collection +queryperiodsecs = 310 ; Delay in seconds between free space queries to the local file systems +mainloopperiodsecs = 300 ; Period in seconds of the main loop of the cta-fst-gcd daemon +xrdsecssskt = /etc/eos.keytab ; Path to simple shared secret to authenticate with EOS MGM diff --git a/python/eosfstgcd/ctafstgcd.py b/python/eosfstgcd/ctafstgcd.py new file mode 100755 index 0000000000000000000000000000000000000000..03df950f5100ef4f5291713e8919f0400d7ff82c --- /dev/null +++ b/python/eosfstgcd/ctafstgcd.py @@ -0,0 +1,433 @@ +#!/bin/python + +# The CERN Tape Archive (CTA) project +# Copyright (C) 2015 CERN +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import argparse +import ConfigParser +import datetime +import distutils +import getpass +import logging +import logging.config +import os +import re +import socket +import subprocess +import sys +import time + +class UserError(Exception): + pass + +class NoMgmHost(UserError): + pass + +class FileSizeAndCtime: + def __init__(self): + self.sizebytes = 0 + self.ctime = 0 + +class RealDisk: + '''Class to faciliate unit-testing by wrapping the disk storage system.''' + + def __init__(self, log): + self.log = log + + def listdir(self, path): + return os.listdir(path) + + def isdir(self, path): + return os.path.isdir(path) + + def isfile(self, path): + return os.path.isfile(path) + + def getfilesizeandctime(self, path): + try: + statinfo = os.stat(path) + + filesizeandctime = FileSizeAndCtime() + filesizeandctime.sizebytes = statinfo.st_size + filesizeandctime.ctime = statinfo.st_ctime + + return filesizeandctime + except Exception as err: + raise Exception("Failed to stat file: path={}: {}".format(path, err)) + + def getfreebytes(self, path): + statvfs = None + try: + statvfs = os.statvfs(path) + return statvfs.f_frsize * statvfs.f_bavail + except Exception as err: + self.log.error("Failed to stat VFS for free space: path={}".format(path)) + +class RealEos: + '''Class to faciliate unit-testing by wrapping the EOS storage system.''' + + def __init__(self, log, mgmhost, xrdsecssskt): + self.log = log + self.mgmhost = mgmhost + self.xrdsecssskt = xrdsecssskt + + def fsls(self): + mgmurl = "root://{}".format(self.mgmhost) + cmd = "eos -r 0 0 {} fs ls -m".format(mgmurl) + env = os.environ.copy() + env["XrdSecPROTOCOL"] = "sss" + env["XrdSecSSSKT"] = self.xrdsecssskt + process = None + try: + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) + except Exception as err: + raise Exception("Failed to execute '{}': {}".format(cmd, err)) + stdout,stderr = process.communicate() + + result = [] + + if 0 != process.returncode: + returncodestr = os.strerror(process.returncode) + stderrstr = stderr.replace('\n', ' ').replace('\r', '').strip() + self.log.error("Failed to execute {}: returncode={} returncodestr='{}' stderr='{}'" + .format(cmd, process.returncode, returncodestr, stderrstr)) + else: + lines = stdout.splitlines(); + for l in lines: + linedict = {} + pairs = l.split() + for p in pairs: + splitpair = p.split('=') + if 2 == len(splitpair): + linedict[splitpair[0]] = splitpair[1] + if linedict: + result.append(linedict) + + return result + + def stagerrm(self, fxid, subdir): + mgmurl = "root://{}".format(self.mgmhost) + cmd = "eos {} stagerrm fxid:{}".format(mgmurl, fxid) + env = os.environ.copy() + env["XrdSecPROTOCOL"] = "sss" + env["XrdSecSSSKT"] = self.xrdsecssskt + process = None + try: + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) + except Exception as err: + raise Exception("Failed to execute '{}': {}".format(cmd, err)) + stdout,stderr = process.communicate() + + if 0 == process.returncode: + self.log.info("subdir={} executed='{}'".format(subdir, cmd)) + +class SpaceTracker: + '''Calculates the amount of effective free space in the file system of a given + file or directory by querying the OS and taking into account the pending + stagerrm requests to the EOS MGM.''' + + def __init__(self, disk, queryperiodsecs, path): + self.disk = disk + self.queryperiodsecs = queryperiodsecs + self.path = path + self.freebytes = None + self.lastquerytimestamp = None + + def stagerrmqueued(self, filesizebytes): + self.freebytes = self.freebytes + filesizebytes + + def getfreebytes(self): + now = time.time() + + if self.freebytes: + elapsed = now - self.lastquerytimestamp + + if elapsed > self.queryperiodsecs: + self.lastquerytimestamp = now + self.freebytes = self.disk.getfreebytes(self.path) + + else: # not self.freebytes: + self.lastquerytimestamp = now + self.freebytes = self.disk.getfreebytes(self.path) + + return self.freebytes + +class SpaceTrackers: + '''Container and factory of SpaceTracker objects. There is one SpaceTracker + per filesystem mount-point being tracked.''' + + def __init__(self, log, disk, queryperiodsecs): + self.log = log + self.disk = disk + self.queryperiodsecs = queryperiodsecs + self.mntpointtotracker = {} + + def gettracker(self, path): + mntpoint = self.getmntpoint(path) + + if not mntpoint in self.mntpointtotracker: + self.mntpointtotracker[mntpoint] = SpaceTracker(self.disk, self.queryperiodsecs, mntpoint) + self.log.info("Tracking storage space of mount point: mntpoint={}".format(mntpoint)) + + return self.mntpointtotracker[mntpoint] + + def getnbtrackers(self): + return len(self.mntpointtotracker) + + def getmntpoint(self, path): + realpath = os.path.realpath(path) + while not os.path.ismount(realpath): + realpath = os.path.dirname(realpath) + return realpath + +class DummyLoggingHandler(logging.StreamHandler): + '''Dummy logging handler that does nothing''' + + def __init__(self): + logging.StreamHandler.__init__(self) + + def emit(self, record): + pass + +class GcConfig: + '''The configuration of a cta-fst-gcd daemon.''' + + def __init__(self): + self.logfile = '' + self.mgmhost = '' + self.minfreebytes = 0 + self.gcagesecs = 0 + self.queryperiodsecs = 0 + self.mainloopperiodsecs = 0 + self.xrdsecssskt = '' + +class Gc: + '''Implements the cta-fst-gcd daemon that runs on an EOS FST and garbage + collects EOS disk copies that have been safely stored to tape. + + The cta-fst-gcd daemon scans across every single EOS disk file on an FST. + A file is garbage collected if: + + * The amount of free space on the corresponding file system is + considered too low. + + * The file is considered old enough to be garbage collected. + + The cta-fst-gcd daemon garbage collects an EOS disk file by extracting the + hexadecimal EOS file identifier from the local disk filename and then + running eos stagerm fxid:<fid-hex>.''' + + def __init__(self, log, fqdn, disk, eos, config): + self.log = log + self.fqdn = fqdn + self.disk = disk + self.eos = eos + self.config = config + + self.localfilesystempaths = [] + self.nbfilesconsideredsincelastreport = 0 + self.nbshouldfreespace = 0 + self.nboldfilesincelastreport = 0 + self.nbstagerrmssincelastreport = 0 + self.stagerrmbytessincelastreport = 0 + self.nbfilesbeforereport = 10000 + self.spacetrackers = SpaceTrackers(self.log, disk, self.config.queryperiodsecs) + + self.log.info("Config: logfile={}".format(self.config.logfile)) + self.log.info("Config: mgmhost={}".format(self.config.mgmhost)) + self.log.info("Config: minfreebytes={}".format(self.config.minfreebytes)) + self.log.info("Config: gcagesecs={}".format(self.config.gcagesecs)) + self.log.info("Config: queryperiodsecs={}". format(self.config.queryperiodsecs)) + self.log.info("Config: mainloopperiodsecs={}". format(self.config.mainloopperiodsecs)) + self.log.info("Config: xrdsecssskt={}".format(self.config.xrdsecssskt)) + + def processfile(self, subdir, fstfile): + spacetracker = self.spacetrackers.gettracker(subdir) + totalfreebytes = spacetracker.getfreebytes() + shouldfreespace = totalfreebytes < self.config.minfreebytes + deltafreebytes = 0 + + self.nbfilesconsideredsincelastreport = self.nbfilesconsideredsincelastreport + 1 + if shouldfreespace: + self.nbshouldfreespace = self.nbshouldfreespace + 1 + else: + deltafreebytes = totalfreebytes - self.config.minfreebytes + + if shouldfreespace: + fullpath = os.path.join(subdir,fstfile) + + filesizeandctime = None + try: + filesizeandctime = self.disk.getfilesizeandctime(fullpath) + except Exception as err: + self.log.error(err) + + if filesizeandctime: + now = time.time() + agesecs = now - filesizeandctime.ctime + if agesecs > self.config.gcagesecs: + self.nboldfilesincelastreport = self.nboldfilesincelastreport + 1 + + self.eos.stagerrm(fstfile, subdir) + spacetracker.stagerrmqueued(filesizeandctime.sizebytes) + + self.nbstagerrmssincelastreport = self.nbstagerrmssincelastreport + 1 + self.stagerrmbytessincelastreport = self.stagerrmbytessincelastreport + filesizeandctime.sizebytes + + if self.nbfilesbeforereport == self.nbfilesconsideredsincelastreport: + self.log.info( + 'Report: nbfiles={}, nbshouldfreespace={}, nbold={}, nbstagerrm={}, stagerrmbytes={}, deltafreebytes={}' + .format(self.nbfilesconsideredsincelastreport, self.nbshouldfreespace, self.nboldfilesincelastreport, + self.nbstagerrmssincelastreport, self.stagerrmbytessincelastreport, deltafreebytes)) + self.nbfilesconsideredsincelastreport = 0 + self.nbshouldfreespace = 0 + self.nboldfilesincelastreport = 0 + self.nbstagerrmssincelastreport = 0 + self.stagerrmbytessincelastreport = 0 + + def processfssubdir(self, subdir): + fstfiles = [f for f in self.disk.listdir(subdir) + if re.match('^[0-9A-Fa-f]{8}$', f) and self.disk.isfile(os.path.join(subdir, f))] + for fstfile in fstfiles: + self.processfile(subdir, fstfile) + + def processfs(self, path): + fssubdirs = [os.path.join(path, f) for f in self.disk.listdir(path) + if re.match('^[0-9A-Fa-f]{8}$', f) and self.disk.isdir(os.path.join(path, f))] + for fssubdir in fssubdirs: + self.processfssubdir(fssubdir) + + def logfilesystempaths(self): + self.log.info('Number of local file systems is {}'.format(len(self.localfilesystempaths))) + i = 0 + for path in self.localfilesystempaths: + self.log.info('Local file system {}: path={}'.format(i, path)) + i = i + 1 + + def processallfs(self): + # Get the paths of the EOS filesystems local to this FST and log them if + # they have changed + filesystems = None + try: + filesystems = self.eos.fsls() + except Exception as err: + self.log.error("Failed to determine the EOS filesystems local to this FST: {}".format(err)) + + if not filesystems: + return + + newlocalfilesystempaths = [fs["path"] for fs in filesystems if "path" in fs and "host" in fs and self.fqdn == fs["host"]] + if newlocalfilesystempaths != self.localfilesystempaths: + self.localfilesystempaths = newlocalfilesystempaths + self.logfilesystempaths(); + + for path in self.localfilesystempaths: + self.processfs(path) + + def run(self, runonlyonce = False): + continueMainLoop = True + while continueMainLoop: + before = time.time() + self.processallfs() + after = time.time() + period = after - before + if period < self.config.mainloopperiodsecs: + sleeptime = self.config.mainloopperiodsecs - period + self.log.debug('Sleeping {} seconds'.format(sleeptime)) + time.sleep(sleeptime) + if runonlyonce: + continueMainLoop = False + +def main(): + programname = 'cta-fst-gcd' + + username = getpass.getuser() + if 'daemon' != username: + raise UserError('{} must be executed as user daemon and not user {}'.format(programname, username)) + + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config", default="/etc/cta/{}.conf".format(programname), help="Configuration file path") + args = parser.parse_args() + + config = parseconf(args.config) + hostname = socket.gethostname() + fqdn = socket.getfqdn() + + log = getlogger(hostname, programname, config.logfile) + log.info('{} started'.format(programname)) + log.info('The fqdn of this machine is {}'.format(fqdn)) + + eos = RealEos(log, config.mgmhost, config.xrdsecssskt) + disk = RealDisk(log) + gc = Gc(log, fqdn, disk, eos, config) + gc.run() + +def parseconf(conffile): + if not os.path.isfile(conffile): + raise UserError("The configuration file {} is not a directory or does not exist".format(conffile)) + if not os.access(conffile, os.R_OK): + raise UserError("Cannot access for reading the configuration file {}".format(conffile)) + + conffp = open(conffile) + + parser = ConfigParser.ConfigParser() + parser.readfp(conffp) + + try: + config = GcConfig() + config.logfile = parser.get('main', 'logfile') + config.mgmhost = parser.get('main', 'mgmhost') + config.minfreebytes = parser.getint('main', 'minfreebytes') + config.gcagesecs = parser.getint('main', 'gcagesecs') + config.queryperiodsecs = parser.getint('main', 'queryperiodsecs') + config.mainloopperiodsecs = parser.getint('main', 'mainloopperiodsecs') + config.xrdsecssskt = parser.get('main', 'xrdsecssskt') + return config + except ConfigParser.Error as err: + raise UserError("Error in configuration file {}: {}".format(conffile, err)) + +def getlogger(hostname, programname, logpath): + config = {} + + logfmt = '%(asctime)s.%(msecs)03d000 ' + hostname + ' %(levelname)s ' + programname + ':LVL="%(levelname)s" PID="%(process)d" TID="%(process)d" MSG="%(message)s"' + logdatefmt = '%Y/%m/%d %H:%M:%S' + logformatter = logging.Formatter(fmt = logfmt, datefmt = logdatefmt) + + loghandler = None + + loggingdir = os.path.dirname(logpath) + if not os.path.isdir(loggingdir): + raise UserError("The logging directory {} is not a directory or does not exist".format(loggingdir)) + if not os.access(loggingdir, os.W_OK): + raise UserError("The logging directory {} cannot be written to".format(loggingdir)) + + loghandler = logging.handlers.TimedRotatingFileHandler(filename = logpath, when = 'midnight') + loghandler.setLevel(logging.INFO) + loghandler.setFormatter(logformatter) + + logger = logging.getLogger('gc') + logger.setLevel(logging.INFO) + logger.addHandler(loghandler) + + return logger + +try: + if __name__ == '__main__': + main() +except UserError as err: + print(err) +except KeyboardInterrupt: + pass diff --git a/python/eosfstgcd/test_ctafstgcd.py b/python/eosfstgcd/test_ctafstgcd.py new file mode 100755 index 0000000000000000000000000000000000000000..f68959b5f76f54cc6f16d211be31d4d319b6ed2d --- /dev/null +++ b/python/eosfstgcd/test_ctafstgcd.py @@ -0,0 +1,513 @@ +#!/bin/python + +# The CERN Tape Archive (CTA) project +# Copyright (C) 2015 CERN +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import cStringIO +import ctafstgcd +import os +import sys +import time +import unittest + +class DummyLogger: + def debug(self, msg): + pass + + def info(self, msg): + pass + + def warning(self, msg): + pass + + def error(self, msg): + pass + + def critical(self, msg): + pass + +class MockTreeNode: + def __init__(self, name, children = []): + self.name = name + self.children = children + +class MockDisk: + def __init__(self, mocktree, freebytes, filesizeandctime): + self.mocktree = mocktree + self.freebytes = freebytes + self.filesizeandctime = filesizeandctime + + self.nblistdir = 0 + self.nbisdir = 0 + self.nbisfile = 0 + self.nbgetfilesizeandctime = 0 + self.nbgetfreebytes = 0 + + def listdir(self, path): + self.nblistdir = self.nblistdir + 1 + + pathlist = self.pathtolist(path) + + currentnodelist = [self.mocktree] + resultnodelist = None + for nodename in pathlist: + matching = filter(lambda node: node.name == nodename, currentnodelist) + if len(matching) == 0: + raise Exception("Cannot list {}".format(path)) + if len(matching) > 1: + raise Exception("More than mock directory with the same name: name={}".format(matching[0].name)) + if len(matching) == 1: + resultnodelist = matching[0].children + currentnodelist = resultnodelist + + resultfilelist = [] + for node in resultnodelist: + resultfilelist.append(node.name) + + return resultfilelist + + def isdir(self, path): + self.nbisdir = self.nbisdir + 1 + return True + + def isfile(self, path): + self.nbisfile = self.nbisfile + 1 + return True + + def getfilesizeandctime(self, path): + self.nbgetfilesizeandctime = self.nbgetfilesizeandctime + 1 + return self.filesizeandctime + + def getfreebytes(self, path): + self.nbgetfreebytes = self.nbgetfreebytes + 1 + return self.freebytes + + def pathtolist(self, path): + result = [] + while path != '' and path != '/': + basename = os.path.basename(path) + if basename != '': + result.insert(0, basename) + path = os.path.dirname(path) + + result.insert(0, '/') + + return result + +class MockEos: + def __init__(self, filesystems = []): + self.filesystems = filesystems + self.nbfsls = 0 + self.nbstagerrm = 0 + + def fsls(self): + self.nbfsls = self.nbfsls + 1 + return self.filesystems + + def stagerrm(self, fxid, subdir): + self.nbstagerrm = self.nbstagerrm + 1 + +class RealDiskCase(unittest.TestCase): + def setUp(self): + self.log = DummyLogger() + + def test_getfreebytes(self): + path = "/" + disksystem = ctafstgcd.RealDisk(self.log) + freebytes = disksystem.getfreebytes(path) + self.assertTrue(freebytes != None) + +class RealEosCase(unittest.TestCase): + def setUp(self): + self.log = DummyLogger() + self.mgmhost = 'mgmhost' + self.xrdsecssskt = 'xrdsecssskt' + + def test_constructor(self): + eos = ctafstgcd.RealEos(self.log, self.mgmhost, self.xrdsecssskt) + +class SpaceTrackerCase(unittest.TestCase): + def setUp(self): + self.mocktree = [] + self.freebytes = 10000 + self.filectime = ctafstgcd.FileSizeAndCtime() + self.filectime.sizebytes = 1000 + self.filectime.ctime = time.time() + + def test_getfreebytes(self): + disksystem = MockDisk(self.mocktree, self.freebytes, self.filectime) + queryperiodsecs = 0 + path = "/" + spacetracker = ctafstgcd.SpaceTracker(disksystem, queryperiodsecs, path) + self.assertEqual(self.freebytes, spacetracker.getfreebytes()) + self.assertEqual(1, disksystem.nbgetfreebytes) + +class SpaceTrackersCase(unittest.TestCase): + def setUp(self): + self.log = DummyLogger() + self.mocktree = [] + self.freebytes = 10000 + self.filectime = ctafstgcd.FileSizeAndCtime() + self.filectime.sizebytes = 1000 + self.filectime.ctime = time.time() + self.fqdn = "a.b.c" + + def test_getmntpoint_root(self): + queryperiodsecs = 0 + path = "/" + disksystem = MockDisk(self.mocktree, self.freebytes, self.filectime) + spacetrackers = ctafstgcd.SpaceTrackers(self.log, disksystem, queryperiodsecs) + self.assertEqual(path, spacetrackers.getmntpoint(path)) + + def test_gettracker_root_once(self): + queryperiodsecs = 0 + path = "/" + disksystem = MockDisk(self.mocktree, self.freebytes, self.filectime) + spacetrackers = ctafstgcd.SpaceTrackers(self.log, disksystem, queryperiodsecs) + + self.assertEqual(0, spacetrackers.getnbtrackers()) + spacetracker = spacetrackers.gettracker(path) + self.assertEqual(1, spacetrackers.getnbtrackers()) + self.assertEqual(path, spacetracker.path) + + def test_gettracker_root_twice(self): + disksystem = MockDisk(self.mocktree, self.freebytes, self.filectime) + queryperiodsecs = 0 + path = "/" + spacetrackers = ctafstgcd.SpaceTrackers(self.log, disksystem, queryperiodsecs) + + self.assertEqual(0, spacetrackers.getnbtrackers()) + firsttracker = spacetrackers.gettracker(path) + self.assertEqual(1, spacetrackers.getnbtrackers()) + self.assertEqual(path, firsttracker.path) + + secondtracker = spacetrackers.gettracker(path) + self.assertEqual(1, spacetrackers.getnbtrackers()) + self.assertEqual(path, secondtracker.path) + + self.assertEqual(firsttracker, secondtracker) + +class GcTestCase(unittest.TestCase): + def setUp(self): + self.log = DummyLogger() + self.mocktree = MockTreeNode("/") + self.freebytes = 10000 + self.filesizeandctime = ctafstgcd.FileSizeAndCtime() + self.filesizeandctime.sizebytes = 1000 + self.filesizeandctime.ctime = time.time() + self.fqdn = "a.b.c" + + self.config = ctafstgcd.GcConfig() + self.config.logfile = 'logfile' + self.config.mgmhost = 'mgmhost' + self.config.minfreebytes = 0 + self.config.gcagesecs = 1000 + self.config.queryperiodsecs = 0 + self.config.mainloopperiodsecs = 0 + self.config.xrdsecssskt = '' + + def tearDown(self): + pass + + def test_constructor(self): + disk = MockDisk(self.mocktree, self.freebytes, self.filesizeandctime) + eos = MockEos() + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + gc = ctafstgcd.Gc(self.log, self.fqdn, disk, eos, self.config) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + def test_run_only_once_no_fs(self): + disk = MockDisk(self.mocktree, self.freebytes, self.filesizeandctime) + eos = MockEos() + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + gc = ctafstgcd.Gc(self.log, self.fqdn, disk, eos, self.config) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + runonlyonce = True + gc.run(runonlyonce) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(1, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + def test_run_only_once_one_fs(self): + mockfs = MockTreeNode("filesystem1") + mocktree = MockTreeNode("/", [mockfs]) + disk = MockDisk(mocktree, self.freebytes, self.filesizeandctime) + + filesystem1 = { + "path" : "/filesystem1", + "host" : self.fqdn + } + filesystems = [filesystem1] + eos = MockEos(filesystems) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + gc = ctafstgcd.Gc(self.log, self.fqdn, disk, eos, self.config) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + runonlyonce = True + gc.run(runonlyonce) + + self.assertEqual(1, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(1, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + def test_run_only_once_one_fs_one_subdir(self): + mocksubdir = MockTreeNode("12345678") + mockfs = MockTreeNode("filesystem1", [mocksubdir]) + mocktree = MockTreeNode("/", [mockfs]) + disk = MockDisk(mocktree, self.freebytes, self.filesizeandctime) + + filesystem1 = { + "path" : "/filesystem1", + "host" : self.fqdn + } + filesystems = [filesystem1] + eos = MockEos(filesystems) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + gc = ctafstgcd.Gc(self.log, self.fqdn, disk, eos, self.config) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + runonlyonce = True + gc.run(runonlyonce) + + self.assertEqual(2, disk.nblistdir) + self.assertEqual(1, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(1, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + def test_run_only_once_one_fs_one_subdir_one_file_free_space(self): + mockfile = MockTreeNode("90abcdef") + mocksubdir = MockTreeNode("12345678", [mockfile]) + mockfs = MockTreeNode("filesystem1", [mocksubdir]) + mocktree = MockTreeNode("/", [mockfs]) + disk = MockDisk(mocktree, self.freebytes, self.filesizeandctime) + + filesystem1 = { + "path" : "/filesystem1", + "host" : self.fqdn + } + filesystems = [filesystem1] + eos = MockEos(filesystems) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + gc = ctafstgcd.Gc(self.log, self.fqdn, disk, eos, self.config) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + runonlyonce = True + gc.run(runonlyonce) + + self.assertEqual(2, disk.nblistdir) + self.assertEqual(1, disk.nbisdir) + self.assertEqual(1, disk.nbisfile) + self.assertEqual(1, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(1, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + def test_run_only_once_one_fs_one_subdir_one_file_no_free_space_young_file(self): + mockfile = MockTreeNode("90abcdef") + mocksubdir = MockTreeNode("12345678", [mockfile]) + mockfs = MockTreeNode("filesystem1", [mocksubdir]) + mocktree = MockTreeNode("/", [mockfs]) + disk = MockDisk(mocktree, self.freebytes, self.filesizeandctime) + + filesystem1 = { + "path" : "/filesystem1", + "host" : self.fqdn + } + filesystems = [filesystem1] + eos = MockEos(filesystems) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + self.config.minfreebytes = self.freebytes + 1 + + gc = ctafstgcd.Gc(self.log, self.fqdn, disk, eos, self.config) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + runonlyonce = True + gc.run(runonlyonce) + + self.assertEqual(2, disk.nblistdir) + self.assertEqual(1, disk.nbisdir) + self.assertEqual(1, disk.nbisfile) + self.assertEqual(1, disk.nbgetfreebytes) + self.assertEqual(1, disk.nbgetfilesizeandctime) + self.assertEqual(1, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + def test_run_only_once_one_fs_one_subdir_one_file_no_free_space_old_file(self): + mockfile = MockTreeNode("90abcdef") + mocksubdir = MockTreeNode("12345678", [mockfile]) + mockfs = MockTreeNode("filesystem1", [mocksubdir]) + mocktree = MockTreeNode("/", [mockfs]) + filesizeandctime = ctafstgcd.FileSizeAndCtime() + filesizeandctime.sizebytes = 1000 + filesizeandctime.ctime = time.time() - self.config.gcagesecs - 1 + disk = MockDisk(mocktree, self.freebytes, filesizeandctime) + + filesystem1 = { + "path" : "/filesystem1", + "host" : self.fqdn + } + filesystems = [filesystem1] + eos = MockEos(filesystems) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + self.config.minfreebytes = self.freebytes + 1 + + gc = ctafstgcd.Gc(self.log, self.fqdn, disk, eos, self.config) + + self.assertEqual(0, disk.nblistdir) + self.assertEqual(0, disk.nbisdir) + self.assertEqual(0, disk.nbisfile) + self.assertEqual(0, disk.nbgetfreebytes) + self.assertEqual(0, disk.nbgetfilesizeandctime) + self.assertEqual(0, eos.nbfsls) + self.assertEqual(0, eos.nbstagerrm) + + runonlyonce = True + gc.run(runonlyonce) + + self.assertEqual(2, disk.nblistdir) + self.assertEqual(1, disk.nbisdir) + self.assertEqual(1, disk.nbisfile) + self.assertEqual(1, disk.nbgetfreebytes) + self.assertEqual(1, disk.nbgetfilesizeandctime) + self.assertEqual(1, eos.nbfsls) + self.assertEqual(1, eos.nbstagerrm) + +if __name__ == '__main__': + suites = [] + suites.append(unittest.TestLoader().loadTestsFromTestCase(RealDiskCase)) + suites.append(unittest.TestLoader().loadTestsFromTestCase(RealEosCase)) + suites.append(unittest.TestLoader().loadTestsFromTestCase(SpaceTrackerCase)) + suites.append(unittest.TestLoader().loadTestsFromTestCase(SpaceTrackersCase)) + suites.append(unittest.TestLoader().loadTestsFromTestCase(GcTestCase)) + + suite = unittest.TestSuite(suites) + + result = unittest.TextTestRunner(verbosity=2).run(suite) + if len(result.failures) == 0: + sys.exit(0) + else: + sys.exit(1)