Skip to content
Snippets Groups Projects
Commit 97c0a5a4 authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Dropped listing of tape transfers: this is going to be replaced by querying...

Dropped listing of tape transfers: this is going to be replaced by querying the xroot server for the ongoing transfers
parent 8a182a57
Branches
Tags
No related merge requests found
......@@ -54,8 +54,6 @@ class ManagerThread(threading.Thread):
self.transferQueue.pollD2dDest()
# check for transfer to be canceled for various reasons (timeout, missing resources, ...)
self.transferQueue.checkForTransfersCancelation()
# recompute list of ongoing tape transfers
self.runningTransfers.listTapeTransfers()
except connectionpool.Timeout, e:
# 'Caught exception in Management thread' message
dlf.writenotice(msgs.MGMTEXCEPTION, Type=str(e.__class__), Message=str(e))
......
......@@ -61,94 +61,6 @@ class RunningTransfersSet(object):
# lock for the tapeTransfers variable
self.tapelock = threading.Lock()
def listTapeTransfers(self):
'''updates the list of ongoing tape transfers'''
# the disk manager daemon still considers a migration stream as active even
# if no physical transfer is taking place for up to 10 seconds after the
# stream has ended.
#
# This logic prevents the disk manager daemon from starting a transfer just
# at the point when one tape migration stops and another one starts.
# Here we simply perform some maintenance operations on the list of
# previously known tape transfers.
for key in self.prevTapeTransfers.keys():
if self.prevTapeTransfers[key].transferType == TapeTransferType.RECALL:
del self.prevTapeTransfers[key] # always get rid of recall cases
elif self.prevTapeTransfers[key].lastTimeViewed < time.time() - 10:
del self.prevTapeTransfers[key]
else:
# reset the hostname to '-', this is not strictly necessary but
# provides a means to see which tape migrations are ongoing and those
# which have just ended
self.prevTapeTransfers[key].clientHost = '-'
# loop over all processes listed in proc
procpath = os.path.sep + 'proc'
for pid in os.listdir(procpath):
if not pid.isdigit():
continue # not a pid
try:
pidpath = os.path.join(procpath, pid)
cmdpath = os.path.join(pidpath, 'cmdline')
cmdline = open(cmdpath, 'rb').read()
# ignore non rfiod processes
if cmdline[0:18] != '/usr/bin/rfiod\0-sl':
continue
# container to hold the paramaters listed in the rfiod information
# file
params = {}
# open the rfio information file and extract the key value pairs
infopath = os.path.join('/var/lib/rfiod', pid + '.info')
f = open(infopath, 'r')
for line in f.readlines():
key, value = line.strip().split('=')
params[key] = value
f.close()
# process the parameters in the file
if params['CASTOR_USER'] != 'stage':
continue # ignore processes not running as stage
if params['CASTOR_D2D'] == 'true':
continue # ignore processes which are d2d transfers
# use the access mode to determine the type of transfer. If open
# exclusively for read then it's a migration stream
transfertype = None
if params['CASTOR_ACCESSMODE'] == 'r':
transfertype = TapeTransferType.MIGRATION
elif params['CASTOR_ACCESSMODE'] == 'w':
transfertype = TapeTransferType.RECALL
if transfertype == None:
continue
# extract the fileid and mountPoint from the filename
filepath = params['CASTOR_FILENAME']
fid, nshost = os.path.basename(filepath).split('@')
nshost = nshost.split('.')[0]
fileid = (nshost, int(fid))
mountPoint = filepath.rsplit(os.sep, 2)[0]+os.sep
# we found a tape transfer
key = str(pid) + ":" + params['CASTOR_OPENTIME']
self.prevTapeTransfers[key] = TapeTransfer(transfertype,
int(params['CASTOR_OPENTIME']),
params['CASTOR_CLIENTHOSTNAME'],
fileid,
mountPoint,
time.time())
except Exception:
# ignore any exceptions, these are probably related to attempts to
# access process information which doesn't exist because the process
# disappeared when we were analysing it.
continue
# renew the tapeTransfers list
self.tapelock.acquire()
try:
# reset the list
self.tapeTransfers = []
# update the list
for transfer in self.prevTapeTransfers.itervalues():
self.tapeTransfers.append(transfer)
finally:
self.tapelock.release()
def populate(self):
'''populates the list of ongoing transfers from the system.
Then synchronize with the stager DB (through the transfer manager)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment