Skip to content
Snippets Groups Projects
Commit fb256058 authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Fixed race condition in starting sources of disk-to-disk copies. We need to...

Fixed race condition in starting sources of disk-to-disk copies. We need to mark the source RUNNING before the destination has a chance to talk to the source diskserver
parent 4a942aa7
Branches
Tags
No related merge requests found
......@@ -78,19 +78,21 @@ class ActivityControlThread(threading.Thread):
srcDS, srcPath = srcDcPath.split(':', 1)
cmdLine = ['xrdcp', '-N', buildXrootURL(srcDS, srcPath, transfer.transferId, transferType), \
buildXrootURL('localhost', destDcPath, transfer.transferId, transferType)]
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
transferType=TransferType.toStr(transfer.transferType),
cmdLine=' '.join(cmdLine))
# start transfer process
process = 0
try:
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
transferType=TransferType.toStr(transfer.transferType),
cmdLine=' '.join(cmdLine))
self.runningTransfers.add(RunningTransfer(scheduler, None, time.time(), transfer, destDcPath))
# start transfer process and store it in our set of running transfers
process = subprocess.Popen(cmdLine, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self.runningTransfers.setProcess(transfer.transferId, process)
except Exception, e:
# avoid raising standard exception, as some are used for dedicated purposes
raise Exception(str(e))
self.runningTransfers.add(RunningTransfer(scheduler, process, time.time(), transfer, destDcPath))
# log "Failed to execute mover" with all details about the transfer
dlf.writeerr(msgs.MOVERSTARTFAILED, transfer=transfer, error=str(e))
# clean up this transfer from the list of running transfers
self.runningTransfers.remove(transfer)
def startMover(self, transfer):
'''effectively starts the required mover after the client had initiated its connection'''
......@@ -130,17 +132,22 @@ class ActivityControlThread(threading.Thread):
# not enough space, we cancel the transfer
connectionpool.connections.transfersCanceled(scheduler, ((transfer.asTuple(), 28, 'No space left on device'),)) # ENOSPC
else:
# for d2d src, pretend the transfer is already running, that is take note of it.
# This is to prevent a race condition whereby the destination could be faster than us
# in receiving back the reply to transferStarting. In case we were not supposed to start,
# we 'apologize' by dropping the transfer afterwards (see the exception handling).
if transfer.transferType == TransferType.D2DSRC:
self.runningTransfers.add(RunningTransfer(scheduler, None, time.time(), transfer))
# notifies the central scheduler that we want to start this transfer
# this may raise a ValueError exception if we should give up (e.g. the
# job has already started somewhere else)
result = connectionpool.connections.transferStarting(scheduler, transfer.asTuple())
if transfer.transferType == TransferType.D2DSRC:
# in this case do not actually start for real, only take note
# in this case there's nothing to start for real and we already took note, so that's all
# "Transfer starting" message
dlf.write(msgs.TRANSFERSTARTING, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId,
transferType=TransferType.toStr(transfer.transferType))
self.runningTransfers.add(RunningTransfer(scheduler, None, time.time(), transfer))
elif transfer.transferType == TransferType.D2DDST:
# here we have to launch the actual transfer
srcPath, dstPath = result
......@@ -177,6 +184,9 @@ class ActivityControlThread(threading.Thread):
# transfers in clientslistener in case of failures or time outs.
self.runningTransfers.add(RunningTransfer(qTransfer.scheduler, None, time.time(), qTransfer.transfer))
except connectionpool.Timeout:
# for d2d sources remember to drop the transfer from the list of running ones
if transfer.transferType == TransferType.D2DSRC:
self.runningTransfers.remove(self.runningTranfers.get(transfer.transferId))
# we timed out in the call to transfersCanceled or transferStarting. We need to try again
# thus we put the transfer into the priority queue and inform the scheduler
try:
......@@ -190,6 +200,9 @@ class ActivityControlThread(threading.Thread):
dlf.writenotice(msgs.TRANSFERBACKTOQUEUEFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, originalError='Timeout', error=e)
except TransferCanceled, e:
# for d2d sources remember to drop the transfer from the list of running ones
if transfer.transferType == TransferType.D2DSRC:
self.runningTransfers.remove(self.runningTranfers.get(transfer.transferId))
# 'Transfer start canceled' message
dlf.writedebug(msgs.TRANSFERSTARTCANCELED, reason=e.args, subreqId=transfer.transferId, fileId=transfer.fileId)
# the transfer has already started somewhere else, or has been canceled, so give up
......@@ -213,7 +226,7 @@ class ActivityControlThread(threading.Thread):
# startup of the transfer failed with unexpected error
# We need to try again thus we put the transfer into the queue and inform the scheduler
# note that we put it now at the end of the queue (opposite approach compared to
# the Timeout case). This allows other transfers to go through in cas of persistent
# the Timeout case). This allows other transfers to go through in case of persistent
# errors (e.g. Connection refused because of a dead transfermanagerd)
try:
self.transferQueue.put(scheduler, transfer)
......@@ -227,14 +240,17 @@ class ActivityControlThread(threading.Thread):
# 'Failed to start transfer and got timeout when putting back to queue'
dlf.writeerr(msgs.TRANSFERBACKTOQUEUEFAILED, subreqid=transfer.transferId,
reqid=transfer.reqId, fileId=transfer.fileId, originalError=e, error=e2)
# for d2d sources remember to drop the transfer from the list of running ones
if transfer.transferType == TransferType.D2DSRC:
self.runningTransfers.remove(self.runningTranfers.get(transfer.transferId))
# wait a bit to avoid a tight loop in case of persistent errors
time.sleep(.1)
else:
# all slots are full, wait a bit before checking again
time.sleep(.05)
except Exception:
except Exception, e:
# "Caught exception in ActivityControl thread" message
dlf.writeerr(msgs.ACTIVITYCONTROLEXCEPTION)
dlf.writeerr(msgs.ACTIVITYCONTROLEXCEPTION, error=e)
time.sleep(1)
def stop(self):
......
......@@ -106,7 +106,7 @@ class MoverReqHandlerThread(threading.Thread):
except KeyError:
# transfer not found: assume it already timed out, raise error
# "Transfer slot timed out" message
dlf.writenotice(msgs.TRANSFERTIMEDOUT, subreqId=transferid)
dlf.writenotice(msgs.TRANSFERTIMEDOUT, subreqId=transferid, transferType=transferType)
raise
if errCode != 0:
# this is a user transfer that has to be failed as we got an error from xroot
......@@ -184,7 +184,7 @@ class MoverReqHandlerThread(threading.Thread):
timeout=timeout)
t.ended = True
return 0
elif t.transfer.transferType == TransferType.D2DSRC or type(t) == transfer.TapeTransfer:
elif t.transfer.transferType == TransferType.D2DSRC or type(t) == TapeTransfer:
# nothing else to be done for sources and tape transfers
return 0
else:
......
......@@ -203,7 +203,6 @@ class RunningTransfersSet(object):
finally:
self.tapelock.release()
def nbTransfers(self, reqUser=None, detailed=False):
'''returns number of running transfers and number of running slots, plus details
per protocol if the detailed parameter is true.
......@@ -398,7 +397,7 @@ class RunningTransfersSet(object):
errMsg = 'Transfer has been killed by signal %d' % (-rc)
errCode = 1015 # SEINTERNAL
elif rc > 0: # these are transfers that got interrupted or somehow failed
errMsg = 'Mover exited with failure, rc=%d' % rc
errMsg = 'Mover exited with failure, rc=%d' % rc
errCode = 1015 # SEINTERNAL
if rTransfer.scheduler not in failedTransfers:
failedTransfers[rTransfer.scheduler] = []
......@@ -419,7 +418,7 @@ class RunningTransfersSet(object):
# "Informed scheduler that transfer failed" message
dlf.write(msgs.INFORMTRANSFERFAILED, Scheduler=scheduler, subreqId=transferId,
reqId=reqId, fileid=fileId, errCode=rc, Message=msg)
except connectionpool.Timeout, e:
except connectionpool.Timeout:
for transferId, fileId, rc, msg, reqId in failedTransfers[scheduler]:
# "Failed to inform scheduler that transfer failed" message
dlf.writenotice(msgs.INFORMTRANSFERFAILEDFAILED, Scheduler=scheduler,
......@@ -428,7 +427,7 @@ class RunningTransfersSet(object):
for transferId, fileId, rc, msg, reqId in failedTransfers[scheduler]:
# "Failed to inform scheduler that transfer failed" message
dlf.writeerr(msgs.INFORMTRANSFERFAILEDFAILED, Scheduler=scheduler,
subreqId=transferId, reqId=reqId, fileid=fileId, Message=msg)
subreqId=transferId, reqId=reqId, fileid=fileId, Message=msg, error=e)
def listTransfers(self, reqUser=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment