Commit 037f6307 authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Fixed bug #104998: Synchronization of d2d srcs too aggressive and creating...

Fixed bug #104998: Synchronization of d2d srcs too aggressive and creating issues for draining cancelation
parent 97d02e37
......@@ -36,6 +36,7 @@ import random
import socket
import dlf
from transfermanagerdlf import msgs
import transfer
import connectionpool, diskserverlistcache
class SynchronizerThread(threading.Thread):
......@@ -44,9 +45,11 @@ class SynchronizerThread(threading.Thread):
transfers, meaning that it will check that transfers running for already long according to the DB
are effectively still running. If they are not, is will update the DB accordingly'''
def __init__(self):
def __init__(self, queueingTransfers):
'''constructor of this thread'''
super(SynchronizerThread, self).__init__(name='Synchronizer')
# the list of queueing transfers
self.queueingTransfers = queueingTransfers
# whether we should continue running
self.running = True
# whether we are connected to the stager DB
......@@ -138,7 +141,7 @@ class SynchronizerThread(threading.Thread):
try:
# list d2d source running and handled by this transfer manager
allTMD2dSrc = connectionpool.connections.getAllRunningD2dSourceTransfers(self.hostname, timeout=None)
allTMD2dSrcSet = set([transferid for transferid, reqid, fileid in allTMD2dSrc])
allTMD2dSrcSet = set([transferid for transferid, reqid_unused, fileid_unused in allTMD2dSrc])
except connectionpool.Timeout:
# we could not list all pending running transfers in the system because of timeouts
# Thus we have to give up with synchronization for this round
......@@ -160,10 +163,16 @@ class SynchronizerThread(threading.Thread):
stcur.callproc('getSchedulerD2dTransfers', [subReqIdsCur])
subReqIds = set([t[0] for t in subReqIdsCur.fetchall()])
# find out the set of these d2d source transfers that are no more in the DB
transfersToEnd = list(allTMD2dSrcSet - subReqIds)
transfersToEnd = allTMD2dSrcSet - subReqIds
# remove the ones for which a destination is still pending
# (they will be cleaned up by the destination job)
pendingD2dDst = set([transferId for transferId, transferType_unused \
in self.queueingTransfers.transfersLocations.keys() \
if transferType_unused == transfer.TransferType.D2DDST])
transfersToEnd = list(transfersToEnd - pendingD2dDst)
# and end them
if transfersToEnd:
tid2fileid = dict([(transferid, fileid) for transferid, reqid, fileid in allTMD2dSrc])
tid2fileid = dict([(transferid, fileid) for transferid, reqid_unused, fileid in allTMD2dSrc])
for transferid in transfersToEnd:
# 'Transfer ended by synchronization as the transfer disappeared from the DB' message
dlf.write(msgs.SYNCHROENDEDTRANSFER, subreqid=transferid, fileid=tid2fileid[transferid])
......
......@@ -666,7 +666,7 @@ try:
# launch a processing thread that will regularly synchronize the stager DB with the running
# transfers, meaning that it will check that transfers running for already long according to the DB
# are effectively still running. If they are not, is will update the DB accordingly
synchroThread = synchronizer.SynchronizerThread()
synchroThread = synchronizer.SynchronizerThread(queueingTransfers)
# starts listening
transfermanager.start()
except SystemExit:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment