Commit 1362fa24 authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Fixed bug #105005: D2D destinations do not respect backfill mechanism when source is not ready

parent 34301e50
......@@ -99,14 +99,23 @@ class LocalQueue(Queue.Queue):
finally:
self.lock.release()
def putPriority(self, scheduler, transfer):
def _putInPriorityQueue(self, transfer):
'''Put a new transfer in the priority queue'''
if transfer.transferType == TransferType.STD or \
((transfer.transferType == TransferType.D2DSRC or transfer.transferType == TransferType.D2DDST) \
and transfer.replicationType == D2DTransferType.USER):
self.priorityQueue.put(transfer.transferId)
else:
self.backfillQueue.put(transfer.transferId)
def putPriority(self, scheduler, transfer):
'''Put a new transfer in the priority queue and register it in queueingTransfers'''
self.lock.acquire()
try:
# first keep note of the new transfer (indexed by subreqId)
self.queueingTransfers[transfer.transferId] = QueueingTransfer(scheduler, transfer)
# then add it to the underlying priority queue
self.priorityQueue.put(transfer.transferId)
# then add it to the underlying priority queue, depending on its type
self._putInPriorityQueue(transfer)
finally:
self.lock.release()
......@@ -267,7 +276,7 @@ class LocalQueue(Queue.Queue):
try:
# put the transferId back into the priority queue in it's time to retry the transfer
dlf.writedebug(msgs.RETRYTRANSFER, subreqid=transferId, reqid=reqid) # "Retrying transfer" message
self.priorityQueue.put(transferId)
self._putInPriorityQueue(transferId)
del self.pendingD2dDest[transferId]
finally:
self.lock.release()
......@@ -284,7 +293,7 @@ class LocalQueue(Queue.Queue):
if timeOfNextTry < currentTime:
# put the transferId back into the priority queue in it's time to retry the transfer
dlf.writedebug(msgs.RETRYTRANSFER, subreqid=transferId) # "Retrying transfer" message
self.priorityQueue.put(transferId)
self._putInPriorityQueue(transferId)
toBeDeleted.append(transferId)
# cleanup list of pending transferIds
for tid in toBeDeleted:
......@@ -320,7 +329,7 @@ class LocalQueue(Queue.Queue):
try:
timeout = timeouts[transfer.svcClassName]
except KeyError:
try :
try:
timeout = timeouts['all']
except KeyError:
# no timeout could be found, so we take it as infinite, meaning we do not cancel anything
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment