Commit 6aea1783 authored by Steven Murray's avatar Steven Murray
Browse files

Removed python-cta package because it really should only have contained one class, namely UserError

parent 7dacda66
......@@ -170,7 +170,6 @@ if (${COMPILE_PACKAGING} STREQUAL "1")
RPMTools_ADD_RPM_TARGETS(
${PROJECT_NAME} ${PROJECT_NAME}.spec.in)
endif (RPMTools_FOUND)
add_dependencies(cta_rpm cta_python_package_rpm)
endif (${COMPILE_PACKAGING} STREQUAL "1")
#add_custom_target(test test/castorUnitTests
......
......@@ -389,7 +389,6 @@ Summary: Tape aware garbage collector daemon to run on an EOS FST
Group: Application/CTA
Requires: eos-client
Requires: python
Requires: python-cta = %{version}-%{release}
%description -n cta-fst-gcd
cta-fst-gcd is a daemon that runs on an EOS FST and garbage
collects EOS disk copies that have been safely stored to tape.
......
......@@ -15,8 +15,4 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
cmake_minimum_required (VERSION 2.6)
#install (PROGRAMS cta-fst-gcd DESTINATION usr/bin)
#install (FILES ${CMAKE_CURRENT_SOURCE_DIR}/cta-fst-gcd.1cta DESTINATION /usr/share/man/man1)
add_subdirectory(eosfstgcd)
add_subdirectory(package)
......@@ -16,6 +16,265 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import cta.fst.gc
import argparse
import ConfigParser
import datetime
import getpass
import logging
import logging.config
import os
import re
import socket
import subprocess
import sys
import time
cta.fst.gc.main()
class UserError(Exception):
pass
class Gc:
def get_env_mgmhost(self):
if "EOS_MGM_URL" in self.env:
mgm_url = self.env["EOS_MGM_URL"]
if mgm_url:
return re.sub("^x?root://", "", mgm_url)
def get_sysconfig_file_mgmhost(self, sysconfig_file):
for line in sysconfig_file:
mgmhostline = re.match("^EOS_MGM_HOST=.*", line)
if mgmhostline:
splitmgmhostline = mgmhostline.group(0).split('=')
if 2 == len(splitmgmhostline):
return splitmgmhostline[1]
def get_syconfig_mgmhost(self):
if os.path.isfile("/etc/sysconfig/eos_env"):
sysconfig_file = open("/etc/sysconfig/eos_env", "r")
return self.get_sysconfig_file_mgmhost(sysconfig_file)
def setmgmhost(self):
self.mgmhost = self.get_env_mgmhost()
if self.mgmhost:
return
self.mgmhost = self.get_syconfig_mgmhost()
if self.mgmhost:
return
raise Exception("Failed to determine the MGM host")
def configuredummylogging(self):
config = {
'version': 1,
'disable_existing_loggers': False,
'loggers': {
'gc' : {
'level': 'INFO'
}
}
}
logging.config.dictConfig(config)
def configurereallogging(self):
if None == self.logfilepath:
raise Exception("Cannot configure file based logging because the log file path has not been set")
loggingdir = os.path.dirname(self.logfilepath)
if not os.path.isdir(loggingdir):
raise UserError("The logging directory {} is not a directory or does not exist".format(loggingdir))
if not os.access(loggingdir, os.W_OK):
raise UserError("The logging directory {} cannot be written to by {}".format(loggingdir, self.programname))
config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'stdout': {
'format': '%(asctime)s.%(msecs)03d000 %(levelname)s ' + self.programname +
': LVL="%(levelname)s" PID="%(process)d" TID="%(process)d" MSG="%(message)s"',
'datefmt': '%Y/%m/%d %H:%M:%S'
}
},
'handlers': {
'logfile': {
'level': 'INFO',
'formatter': 'stdout',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename' : self.logfilepath,
'when' : 'midnight'
}
},
'loggers': {
'gc' : {
'handlers': ['logfile'],
'level': 'INFO'
}
}
}
# Failing to configure the logging system is usually a user error
try:
logging.config.dictConfig(config)
except Exception as err:
raise UserError(err)
def configurelogging(self):
if None == self.logfilepath:
self.configuredummylogging()
else:
self.configurereallogging()
def __init__(self, env, logfilepath = '/var/log/eos/fst/cta-fst-gcd.log'):
self.programname = 'cta-fst-gcd'
self.env = env
self.conffilepath = '/etc/cta/cta-fst-gcd.conf'
self.logfilepath = logfilepath
self.fqdn = socket.getfqdn()
self.localfilesystempaths = []
def eosfsls(self):
mgmurl = "root://{}".format(self.mgmhost)
cmd = "eos -r 0 0 {} fs ls -m".format(mgmurl)
env = os.environ.copy()
env["XrdSecPROTOCOL"] = "sss"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
stdout,stderr = process.communicate()
if 0 != process.returncode:
raise Exception(
"\n"
"Failed to to execute: {}\n"
"Return code : {}\n"
"Return code strerror: {}\n"
"Standard error : {}".format(cmd, process.returncode, os.strerror(process.returncode), stderr))
result = []
lines = stdout.splitlines();
for l in lines:
linedict = {}
pairs = l.split()
for p in pairs:
splitpair = p.split('=')
if 2 == len(splitpair):
linedict[splitpair[0]] = splitpair[1]
if linedict:
result.append(linedict)
return result
def eosstagerrm(self, fxid):
logger = logging.getLogger('gc')
mgmurl = "root://{}".format(self.mgmhost)
cmd = "eos {} stagerrm fxid:{}".format(mgmurl, fxid)
env = os.environ.copy()
env["XrdSecPROTOCOL"] = "sss"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
stdout,stderr = process.communicate()
if 0 == process.returncode:
logger.info("Executed {}".format(cmd))
def processfile(self, subdir, fstfile):
statvfs = os.statvfs(subdir)
freebytes = statvfs.f_frsize * statvfs.f_bavail
spaceshouldefreed = freebytes < self.minfreebytes
if spaceshouldefreed:
fullpath = os.path.join(subdir,fstfile)
statinfo = os.stat(fullpath)
now = time.time()
agesecs = now - statinfo.st_ctime
if agesecs > self.gcagesecs:
self.eosstagerrm(fstfile)
def processfssubdir(self, subdir):
logger = logging.getLogger('gc')
fstfiles = [f for f in os.listdir(subdir)
if re.match('^[0-9A-Fa-f]{8}$', f) and os.path.isfile(os.path.join(subdir, f))]
for fstfile in fstfiles:
self.processfile(subdir, fstfile)
def processfs(self, path):
fssubdirs = [os.path.join(path, f) for f in os.listdir(path)
if re.match('^[0-9A-Fa-f]{8}$', f) and os.path.isdir(os.path.join(path, f))]
for fssubdir in fssubdirs:
self.processfssubdir(fssubdir)
def logfilesystempaths(self):
logger = logging.getLogger('gc')
logger.info('Number of local file systems is {}'.format(len(self.localfilesystempaths)))
i = 0
for path in self.localfilesystempaths:
statvfs = os.statvfs(path)
freebytes = statvfs.f_frsize * statvfs.f_bavail
logger.info('Local file system {}: path={} freebytes={}'.format(i, path, freebytes))
i = i + 1
def processallfs(self):
filesystems = self.eosfsls()
newlocalfilesystempaths = [fs["path"] for fs in filesystems if "path" in fs and "host" in fs and self.fqdn == fs["host"]]
if newlocalfilesystempaths != self.localfilesystempaths:
self.localfilesystempaths = newlocalfilesystempaths
self.logfilesystempaths();
for path in self.localfilesystempaths:
self.processfs(path)
def logconf(self):
logger = logging.getLogger('gc')
logger.info("config minfreebytes={}".format(self.minfreebytes))
logger.info("config gcagesecs={}".format(self.gcagesecs))
def readconf(self):
if not os.path.isfile(self.conffilepath):
raise UserError("The configuration file {} is not a directory or does not exist".format(self.conffilepath))
if not os.access(self.conffilepath, os.R_OK):
raise UserError("The configuration file {} cannot be read by {}".format(self.conffilepath, self.programname))
config = ConfigParser.ConfigParser()
config.read(self.conffilepath)
try:
self.minfreebytes = config.getint('main', 'minfreebytes')
self.gcagesecs = config.getint('main', 'gcagesecs')
except ConfigParser.Error as err:
raise UserError("Error with configuration file {}: {}".format(self.conffilepath, err))
def run(self):
username = getpass.getuser()
if 'daemon' != username:
raise UserError('{} must be executed as user daemon and not user {}'.format(self.programname, username))
self.setmgmhost()
self.configurelogging()
logger = logging.getLogger('gc')
logger.info('{} started'.format(self.programname))
logger.info('The EOS MGM host is {}'.format(self.mgmhost))
logger.info('The fqdn of this machine is {}'.format(self.fqdn))
self.readconf()
self.logconf()
minperiod = 300 # In seconds
while True:
before = time.time()
self.processallfs()
after = time.time()
period = after - before
if period < minperiod:
sleeptime = minperiod - period
logger.debug('Sleeping {} seconds'.format(sleeptime))
time.sleep(sleeptime)
def main():
programname = 'cta-fst-gcd'
parser = argparse.ArgumentParser()
args = parser.parse_args()
gc = Gc(os.environ)
try:
gc.run()
except UserError as err:
print "User error: {}".format(err)
if __name__ == '__main__':
main()
# The CERN Tape Archive (CTA) project
# Copyright (C) 2015 CERN
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
cmake_minimum_required (VERSION 2.6)
find_package (python REQUIRED)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.in
${CMAKE_CURRENT_BINARY_DIR}/setup.py)
set (CTA_PYTHON_PACKAGE_SRC_FILES
${CMAKE_CURRENT_SOURCE_DIR}/cta/__init__.py
${CMAKE_CURRENT_SOURCE_DIR}/cta/fst/__init__.py
${CMAKE_CURRENT_SOURCE_DIR}/cta/fst/gc.py
${CMAKE_CURRENT_SOURCE_DIR}/cta/exceptions.py)
set (CTA_PYTHON_PACKAGE_RPMS
${CMAKE_CURRENT_BINARY_DIR}/dist/python-cta-${CTA_VERSION}.${CTA_RELEASE}.noarch.rpm
${CMAKE_CURRENT_BINARY_DIR}/dist/python-cta-${CTA_VERSION}.${CTA_RELEASE}.src.rpm)
add_custom_command(OUTPUT ${CTA_PYTHON_PACKAGE_RPMS}
COMMAND ${PYTHON_PROGRAM} ${CMAKE_CURRENT_BINARY_DIR}/setup.py bdist_rpm --release ${CTA_RELEASE}.el7.cern
DEPENDS ${CTA_PYTHON_PACKAGE_SRC_FILES})
add_custom_target(cta_python_package_rpm DEPENDS ${CTA_PYTHON_PACKAGE_RPMS})
This diff is collapsed.
The tape aware FST garbage collector implemented under the CERN Tape Archive (CTA) project.
# The CERN Tape Archive (CTA) project
# Copyright (C) 2015 CERN
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# The CERN Tape Archive (CTA) project
# Copyright (C) 2015 CERN
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class UserError(Exception):
pass
# The CERN Tape Archive (CTA) project
# Copyright (C) 2015 CERN
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#!/bin/python
# The CERN Tape Archive (CTA) project
# Copyright (C) 2015 CERN
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import ConfigParser
import datetime
import getpass
import logging
import logging.config
import os
import re
import socket
import subprocess
import sys
import time
from cta.exceptions import UserError
class Gc:
def get_env_mgmhost(self):
if "EOS_MGM_URL" in self.env:
mgm_url = self.env["EOS_MGM_URL"]
if mgm_url:
return re.sub("^x?root://", "", mgm_url)
def get_sysconfig_file_mgmhost(self, sysconfig_file):
for line in sysconfig_file:
mgmhostline = re.match("^EOS_MGM_HOST=.*", line)
if mgmhostline:
splitmgmhostline = mgmhostline.group(0).split('=')
if 2 == len(splitmgmhostline):
return splitmgmhostline[1]
def get_syconfig_mgmhost(self):
if os.path.isfile("/etc/sysconfig/eos_env"):
sysconfig_file = open("/etc/sysconfig/eos_env", "r")
return self.get_sysconfig_file_mgmhost(sysconfig_file)
def setmgmhost(self):
self.mgmhost = self.get_env_mgmhost()
if self.mgmhost:
return
self.mgmhost = self.get_syconfig_mgmhost()
if self.mgmhost:
return
raise Exception("Failed to determine the MGM host")
def configuredummylogging(self):
config = {
'version': 1,
'disable_existing_loggers': False,
'loggers': {
'gc' : {
'level': 'INFO'
}
}
}
logging.config.dictConfig(config)
def configurereallogging(self):
if None == self.logfilepath:
raise Exception("Cannot configure file based logging because the log file path has not been set")
loggingdir = os.path.dirname(self.logfilepath)
if not os.path.isdir(loggingdir):
raise UserError("The logging directory {} is not a directory or does not exist".format(loggingdir))
if not os.access(loggingdir, os.W_OK):
raise UserError("The logging directory {} cannot be written to by {}".format(loggingdir, self.programname))
config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'stdout': {
'format': '%(asctime)s.%(msecs)03d000 %(levelname)s ' + self.programname +
': LVL="%(levelname)s" PID="%(process)d" TID="%(process)d" MSG="%(message)s"',
'datefmt': '%Y/%m/%d %H:%M:%S'
}
},
'handlers': {
'logfile': {
'level': 'INFO',
'formatter': 'stdout',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename' : self.logfilepath,
'when' : 'midnight'
}
},
'loggers': {
'gc' : {
'handlers': ['logfile'],
'level': 'INFO'
}
}
}
# Failing to configure the logging system is usually a user error
try:
logging.config.dictConfig(config)
except Exception as err:
raise UserError(err)
def configurelogging(self):
if None == self.logfilepath:
self.configuredummylogging()
else:
self.configurereallogging()
def __init__(self, env, logfilepath = '/var/log/eos/fst/cta-fst-gcd.log'):
self.programname = 'cta-fst-gcd'
self.env = env
self.conffilepath = '/etc/cta/cta-fst-gcd.conf'
self.logfilepath = logfilepath
self.fqdn = socket.getfqdn()
self.localfilesystempaths = []
def eosfsls(self):
mgmurl = "root://{}".format(self.mgmhost)
cmd = "eos -r 0 0 {} fs ls -m".format(mgmurl)
env = os.environ.copy()
env["XrdSecPROTOCOL"] = "sss"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
stdout,stderr = process.communicate()
if 0 != process.returncode:
raise Exception(
"\n"
"Failed to to execute: {}\n"
"Return code : {}\n"
"Return code strerror: {}\n"
"Standard error : {}".format(cmd, process.returncode, os.strerror(process.returncode), stderr))
result = []
lines = stdout.splitlines();
for l in lines:
linedict = {}
pairs = l.split()
for p in pairs:
splitpair = p.split('=')
if 2 == len(splitpair):
linedict[splitpair[0]] = splitpair[1]
if linedict:
result.append(linedict)
return result
def eosstagerrm(self, fxid):
logger = logging.getLogger('gc')
mgmurl = "root://{}".format(self.mgmhost)
cmd = "eos {} stagerrm fxid:{}".format(self.mgmurl, fxid)
env = os.environ.copy()
env["XrdSecPROTOCOL"] = "sss"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
stdout,stderr = process.communicate()
if 0 == process.returncode:
logger.info("Executed {}".format(cmd))
def processfile(self, subdir, fstfile):
statvfs = os.statvfs(subdir)
spaceshouldefreed = statvfs.f_frsize * statvfs.f_bavail > self.minfreebytes
if spaceshouldefreed:
fullpath = os.path.join(subdir,fstfile)
statinfo = os.stat(fullpath)
now = time.time()
agesecs = now - statinfo.st_ctime
if agesecs > self.gcagesecs:
self.eosstagerrm(fstfile)
def processfssubdir(self, subdir):
logger = logging.getLogger('gc')
fstfiles = [f for f in os.listdir(subdir)
if re.match('^[0-9A-Fa-f]{8}$', f) and os.path.isfile(os.path.join(subdir, f))]
for fstfile in fstfiles:
self.processfile(subdir, fstfile)
def processfs(self, path):
fssubdirs = [os.path.join(path, f) for f in os.listdir(path)
if re.match('^[0-9A-Fa-f]{8}$', f) and os.path.isdir(os.path.join(path, f))]
for fssubdir in fssubdirs:
self.processfssubdir(fssubdir)
def logfilesystempaths(self):
logger = logging.getLogger('gc')
logger.info('Number of local file systems is {}'.format(len(self.localfilesystempaths)))
i = 0
for path in self.localfilesystempaths:
logger.info('Local file system {}: {}'.format(i, path))
i = i + 1
def processallfs(self):
filesystems = self.eosfsls()
newlocalfilesystempaths = [fs["path"] for fs in filesystems if "path" in fs and "host" in fs and self.fqdn == fs["host"]]
if newlocalfilesystempaths != self.localfilesystempaths:
self.localfilesystempaths = newlocalfilesystempaths
self.logfilesystempaths();