Commit a69e9a59 authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Renamed runningtransfersset.remove() into kill() and introduced a real...

Renamed runningtransfersset.remove() into kill() and introduced a real remove() method without side effects, that is used in two places
parent 7a8d4aa8
......@@ -147,7 +147,7 @@ class ActivityControlThread(threading.Thread):
# log "Failed to execute mover" with all details about the transfer
dlf.writeerr(msgs.MOVERSTARTFAILED, transfer=transfer, error=str(e))
# clean up this transfer from the list of running transfers
self.runningTransfers.remove([transfer.transferId])
self.runningTransfers.remove(transfer)
def run(self):
'''main method, containing the infinite loop'''
......
......@@ -120,8 +120,8 @@ class DiskServerManagerService(rpyc.Service):
dlf.writedebug(msgs.INVOKINGKILLTRANSFERS, TransferIds=', '.join(transferIds))
# remove transfers from the local queue
transferQueue.remove(transferIds)
# in case it's running, remove it from the running queue
runningTransfers.remove(transferIds)
# in case it's running, remove and kill it from the running queue
runningTransfers.kill(transferIds)
def exposed_transferAlreadyStarted(self, transferId, reqId):
'''Called when a job has started on another diskServer and can be canceled on this one'''
......@@ -147,7 +147,7 @@ class DiskServerManagerService(rpyc.Service):
'''called when a d2d copy is over and we are the source'''
transfer = tupleToTransfer(transferTuple)
dlf.writedebug(msgs.INVOKINGD2DEND, subreqId=transfer.transferId, reqId=transfer.reqId)
runningTransfers.remove([transfer.transferId])
runningTransfers.kill([transfer.transferId])
def exposed_anyTransfersFromScheduler(self, scheduler):
'''Tells whether any transfer is queueing/ongoing that is handled
......
......@@ -95,7 +95,7 @@ class MoverReqHandlerThread(threading.Thread):
# find transfer in runningTransfers, raise KeyError if not found
t = self.runningTransfers.get(transferid)
# remove it from there as it is not running any longer
self.runningTransfers.remove([transferid])
self.runningTransfers.remove(t)
# get the admin timeout
timeout = self.config.getValue('TransferManager', 'AdminTimeout', 5, float)
closeTime = time.time()
......
......@@ -221,8 +221,8 @@ class RunningTransfersSet(object):
finally:
self.lock.release()
def remove(self, transferIds):
'''removes a transfer from the list, and kills corresponding process, when possible'''
def kill(self, transferIds):
'''removes a set of transfers from the list, and kills corresponding process, when possible'''
self.lock.acquire()
try:
# kill what can be killed
......@@ -235,6 +235,16 @@ class RunningTransfersSet(object):
finally:
self.lock.release()
def remove(self, transfer):
'''removes a transfer from the list, assuming the corresponding mover process is gone'''
self.lock.acquire()
try:
self.transfers.remove(transfer)
except KeyError:
pass
finally:
self.lock.release()
def nbTransfers(self, reqUser=None, detailed=False):
'''returns number of running transfers and number of running slots, plus details
per protocol if the detailed parameter is true.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment