Commit 861e5355 authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Fixed typo in python that prevented diskmanagerd to start. Also cleaned up the...

Fixed typo in python that prevented diskmanagerd to start. Also cleaned up the file for trailing spaces
parent 8344bd79
......@@ -45,16 +45,16 @@ def buildXrootURL(self, diskserver, path):
'''Builds a xroot valid url for the given path on the given diskserver'''
# base url and key parameter
url = 'root://'+diskserver+':1095//dummy?'
opaque_dict= {'castor2fs.sfn' : '',
'castor2fs.pfn1' : path,
'castor2fs.pfn2' : '',
opaque_dict= {'castor2fs.sfn' : '',
'castor2fs.pfn1' : path,
'castor2fs.pfn2' : '',
'castor2fs.id' : '',
'castor2fs.client_sec_uid' : '',
'castor2fs.client_sec_gid' : '',
'castor2fs.client_sec_uid' : '',
'castor2fs.client_sec_gid' : '',
'castor2fs.accessop' : '0',
'castor2fs.exptime' : str(int(time.time()) + 3600),
'castor2fs.manager' : ''}
# signature part
try:
# get Xroot RSA key
......@@ -62,22 +62,22 @@ def buildXrootURL(self, diskserver, path):
'/opt/xrootd/keys/key.pem')
key = RSA.importKey(open(keyFile, 'r').read())
# sign opaque part obtained by concatenating the values
opaque_token = ''.join([opaque_dict['castor2fs.sfn'],
opaque_token = ''.join([opaque_dict['castor2fs.sfn'],
opaque_dict['castor2fs.pfn1'],
opaque_dict['castor2fs.pfn2']
opaque_dict['castor2fs.id'],
opaque_dict['castor2fs.pfn2'],
opaque_dict['castor2fs.id'],
opaque_dict['castor2fs.client_sec_uid'],
opaque_dict['castor2fs.client_sec_gid'],
opaque_dict['castor2fs.accessop'],
opaque_dict['castor2fs.exptime'],
opaque_dict['castor2fs.manager']])
opaque_dict['castor2fs.accessop'],
opaque_dict['castor2fs.exptime'],
opaque_dict['castor2fs.manager']])
signature = signBase64(opaque_token, key)
opaque = ""
# build the opaque info
for key, val in opaque_dict.iteritems():
opaque += key + '=' + val + '&'
opaque += key + '=' + val + '&'
url += opaque + 'castor2fs.signature=' + signature
return url
except Exception, e:
......@@ -85,165 +85,165 @@ def buildXrootURL(self, diskserver, path):
raise Exception(str(e))
class ActivityControlThread(threading.Thread):
'''activity control thread.
'''activity control thread.
This thread is responsible for starting new transfers when free slots are available'''
def __init__(self, runningTransfers, transferQueue, configuration, fake):
'''constructor'''
super(ActivityControlThread, self).__init__(name='ActivityControl')
self.alive = True
self.runningTransfers = runningTransfers
self.transferQueue = transferQueue
self.configuration = configuration
self.fake = fake
# last time we've tried to schedule a transfer. This is used by the ActivityControlChecker
# to find out whether the ActivityControl thread is dead or not
self.lastschedattempt = 0
# start the thread
self.setDaemon(True)
self.start()
'''constructor'''
super(ActivityControlThread, self).__init__(name='ActivityControl')
self.alive = True
self.runningTransfers = runningTransfers
self.transferQueue = transferQueue
self.configuration = configuration
self.fake = fake
# last time we've tried to schedule a transfer. This is used by the ActivityControlChecker
# to find out whether the ActivityControl thread is dead or not
self.lastschedattempt = 0
# start the thread
self.setDaemon(True)
self.start()
def checkSpaceLeftOnFS(self, transfer):
''' check whether there is space available for a given transfer'''
# check whether this transfer will write data
if transfer.transferType == TransferType.D2DSRC:
return True
if transfer.transferType == TransferType.STD and transfer.flags == 'r':
return True
# find out the limit in terms of free space, from the config file
minFreeSpacePerc = self.configuration.getValue('DiskManager', 'FSMinAllowedFreeSpace', 0.02, float)
if transfer.mountPoint:
# get status of the filesystem
stat = os.statvfs(transfer.mountPoint)
availableSpace = stat.f_bavail * stat.f_frsize
totalSpace = stat.f_blocks * stat.f_bsize
# do the check
return availableSpace > minFreeSpacePerc * totalSpace
else:
return True
''' check whether there is space available for a given transfer'''
# check whether this transfer will write data
if transfer.transferType == TransferType.D2DSRC:
return True
if transfer.transferType == TransferType.STD and transfer.flags == 'r':
return True
# find out the limit in terms of free space, from the config file
minFreeSpacePerc = self.configuration.getValue('DiskManager', 'FSMinAllowedFreeSpace', 0.02, float)
if transfer.mountPoint:
# get status of the filesystem
stat = os.statvfs(transfer.mountPoint)
availableSpace = stat.f_bavail * stat.f_frsize
totalSpace = stat.f_blocks * stat.f_bsize
# do the check
return availableSpace > minFreeSpacePerc * totalSpace
else:
return True
def startD2DTransfer(self, scheduler, transfer, srcDcPath, destDcPath):
'''effectively starts a disk to disk transfer'''
# build command line
srcDS, srcPath = srcDcPath.split(':', 1)
cmdLine = ['xrdcp', buildXrootURL(self, srcDS, srcPath), \
buildXrootURL(self, 'localhost', destDcPath)]
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
TransferType=TransferType.toStr(transfer.transferType),
cmdLine=' '.join(cmdLine))
# start transfer process
process = 0
try:
process = subprocess.Popen(cmdLine, close_fds=True, stderr=subprocess.PIPE)
except Exception, e:
# avoid raising standard exception, as some are used for dedicated purposes
raise Exception(str(e))
self.runningTransfers.add(RunningTransfer(scheduler, process, time.time(), transfer, destDcPath))
'''effectively starts a disk to disk transfer'''
# build command line
srcDS, srcPath = srcDcPath.split(':', 1)
cmdLine = ['xrdcp', buildXrootURL(self, srcDS, srcPath), \
buildXrootURL(self, 'localhost', destDcPath)]
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
TransferType=TransferType.toStr(transfer.transferType),
cmdLine=' '.join(cmdLine))
# start transfer process
process = 0
try:
process = subprocess.Popen(cmdLine, close_fds=True, stderr=subprocess.PIPE)
except Exception, e:
# avoid raising standard exception, as some are used for dedicated purposes
raise Exception(str(e))
self.runningTransfers.add(RunningTransfer(scheduler, process, time.time(), transfer, destDcPath))
def run(self):
'''main method, containing the infinite loop'''
while self.alive:
try:
# check number of running transfers against limit
if self.runningTransfers.nbUsedSlots() < self.configuration.getValue('DiskManager', 'NbSlots', 0, int):
# get a new transfer from the transferQueue
qTransfer = self.transferQueue.get()
self.lastschedattempt = time.time()
# check whether we got something or timed out
if qTransfer == None:
# We timed out, retry straight
continue
scheduler, transfer = qTransfer.scheduler, qTransfer.transfer
'''main method, containing the infinite loop'''
while self.alive:
try:
# in case of transfers wanting to write data, we check that space is available
if not self.checkSpaceLeftOnFS(transfer):
# not enough space, we cancel the transfer
connectionpool.connections.transfersCanceled(scheduler, ((transfer.asTuple(), 28, 'No space left on device'),)) # ENOSPC
else:
# notifies the central scheduler that we want to start this transfer
# this may raise a ValueError exception if we should give up (e.g. the
# job has already started somewhere else)
result = connectionpool.connections.transferStarting(scheduler, transfer.asTuple())
# in case of d2dsrc, do not actually start for real, only take note
if transfer.transferType == TransferType.D2DSRC:
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
TransferType=TransferType.toStr(transfer.transferType))
self.runningTransfers.add(RunningTransfer(scheduler, None, time.time(), transfer, None))
elif transfer.transferType == TransferType.D2DDST:
# we have to launch the actual transfer
srcPath, dstPath = result
self.startD2DTransfer(scheduler, transfer, srcPath, dstPath)
else:
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
TransferType=TransferType.toStr(transfer.transferType))
# start the transfer
if not self.fake:
try:
process = subprocess.Popen(transferToCmdLine(transfer), close_fds=True, stderr=subprocess.PIPE)
except Exception, e:
# avoid raising standard exception, as some are used for dedicated purposes
raise Exception(str(e))
else:
process = 0
self.runningTransfers.add(RunningTransfer(scheduler, process, time.time(), transfer, None))
except connectionpool.Timeout:
# we timed out in the call to transfersCanceled or transferStarting. We need to try again
# thus we put the transfer into the priority queue and inform the scheduler
try:
self.transferQueue.putPriority(scheduler, transfer)
connectionpool.connections.transferBackToQueue(scheduler, transfer.asTuple())
# 'Timeout when trying to start/cancel transfer. Putting it back to the queue' message
dlf.writenotice(msgs.TRANSFERSTARTTIMEOUT, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId)
except Exception, e:
# 'Failed to start transfer and got timeout when putting back to queue'
dlf.writenotice(msgs.TRANSFERBACKTOQUEUEFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, originalError='Timeout', error=e)
except ValueError, e:
# 'Transfer start canceled' message
dlf.writedebug(msgs.TRANSFERSTARTCANCELED, reason=e.args, subreqid=transfer.transferId,
fileId=transfer.fileId)
# the transfer has already started somewhere else, or has been canceled, so give up
except EnvironmentError:
# we have tried to start a disk to disk copy and the source is not yet ready
# we will put it in a pending queue
# "Start postponned until source is ready" message
dlf.write(msgs.POSTPONEDFORSRCNOTREADY, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId)
# put the transfer into the pending queue
self.transferQueue.d2dDestReady(scheduler, transfer)
except Exception, e:
# startup of the transfer failed with unexpected error
# We need to try again thus we put the transfer into the queue and inform the scheduler
# note that we put it now at the end of the queue (opposite approach compared to
# the Timeout case). This allows other transfers to go through in cas of persistent
# errors (e.g. Connection refused because of a dead transfermanagerd)
try:
self.transferQueue.put(scheduler, transfer)
connectionpool.connections.transferBackToQueue(scheduler, transfer.asTuple())
# 'Failed to start transfer. Putting it back to the queue' message
dlf.writeerr(msgs.TRANSFERSTARTINGFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, error=e)
time.sleep(1)
except Exception, e2:
# clear this exception context, i.e. the Timeout, so that we log the original error
sys.exc_clear()
# 'Failed to start transfer and got timeout when putting back to queue'
dlf.writeerr(msgs.TRANSFERBACKTOQUEUEFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, originalError=e, error=e2)
else:
time.sleep(.05)
except Exception:
# "Caught exception in ActivityControl thread" message
dlf.writeerr(msgs.ACTIVITYCONTROLEXCEPTION)
time.sleep(1)
# check number of running transfers against limit
if self.runningTransfers.nbUsedSlots() < self.configuration.getValue('DiskManager', 'NbSlots', 0, int):
# get a new transfer from the transferQueue
qTransfer = self.transferQueue.get()
self.lastschedattempt = time.time()
# check whether we got something or timed out
if qTransfer == None:
# We timed out, retry straight
continue
scheduler, transfer = qTransfer.scheduler, qTransfer.transfer
try:
# in case of transfers wanting to write data, we check that space is available
if not self.checkSpaceLeftOnFS(transfer):
# not enough space, we cancel the transfer
connectionpool.connections.transfersCanceled(scheduler, ((transfer.asTuple(), 28, 'No space left on device'),)) # ENOSPC
else:
# notifies the central scheduler that we want to start this transfer
# this may raise a ValueError exception if we should give up (e.g. the
# job has already started somewhere else)
result = connectionpool.connections.transferStarting(scheduler, transfer.asTuple())
# in case of d2dsrc, do not actually start for real, only take note
if transfer.transferType == TransferType.D2DSRC:
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
TransferType=TransferType.toStr(transfer.transferType))
self.runningTransfers.add(RunningTransfer(scheduler, None, time.time(), transfer, None))
elif transfer.transferType == TransferType.D2DDST:
# we have to launch the actual transfer
srcPath, dstPath = result
self.startD2DTransfer(scheduler, transfer, srcPath, dstPath)
else:
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
TransferType=TransferType.toStr(transfer.transferType))
# start the transfer
if not self.fake:
try:
process = subprocess.Popen(transferToCmdLine(transfer), close_fds=True, stderr=subprocess.PIPE)
except Exception, e:
# avoid raising standard exception, as some are used for dedicated purposes
raise Exception(str(e))
else:
process = 0
self.runningTransfers.add(RunningTransfer(scheduler, process, time.time(), transfer, None))
except connectionpool.Timeout:
# we timed out in the call to transfersCanceled or transferStarting. We need to try again
# thus we put the transfer into the priority queue and inform the scheduler
try:
self.transferQueue.putPriority(scheduler, transfer)
connectionpool.connections.transferBackToQueue(scheduler, transfer.asTuple())
# 'Timeout when trying to start/cancel transfer. Putting it back to the queue' message
dlf.writenotice(msgs.TRANSFERSTARTTIMEOUT, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId)
except Exception, e:
# 'Failed to start transfer and got timeout when putting back to queue'
dlf.writenotice(msgs.TRANSFERBACKTOQUEUEFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, originalError='Timeout', error=e)
except ValueError, e:
# 'Transfer start canceled' message
dlf.writedebug(msgs.TRANSFERSTARTCANCELED, reason=e.args, subreqid=transfer.transferId,
fileId=transfer.fileId)
# the transfer has already started somewhere else, or has been canceled, so give up
except EnvironmentError:
# we have tried to start a disk to disk copy and the source is not yet ready
# we will put it in a pending queue
# "Start postponned until source is ready" message
dlf.write(msgs.POSTPONEDFORSRCNOTREADY, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId)
# put the transfer into the pending queue
self.transferQueue.d2dDestReady(scheduler, transfer)
except Exception, e:
# startup of the transfer failed with unexpected error
# We need to try again thus we put the transfer into the queue and inform the scheduler
# note that we put it now at the end of the queue (opposite approach compared to
# the Timeout case). This allows other transfers to go through in cas of persistent
# errors (e.g. Connection refused because of a dead transfermanagerd)
try:
self.transferQueue.put(scheduler, transfer)
connectionpool.connections.transferBackToQueue(scheduler, transfer.asTuple())
# 'Failed to start transfer. Putting it back to the queue' message
dlf.writeerr(msgs.TRANSFERSTARTINGFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, error=e)
time.sleep(1)
except Exception, e2:
# clear this exception context, i.e. the Timeout, so that we log the original error
sys.exc_clear()
# 'Failed to start transfer and got timeout when putting back to queue'
dlf.writeerr(msgs.TRANSFERBACKTOQUEUEFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, originalError=e, error=e2)
else:
time.sleep(.05)
except Exception:
# "Caught exception in ActivityControl thread" message
dlf.writeerr(msgs.ACTIVITYCONTROLEXCEPTION)
time.sleep(1)
def stop(self):
'''stops processing in this thread'''
self.alive = False
'''stops processing in this thread'''
self.alive = False
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment