diff --git a/castor/scheduler/diskmanager/runningtransfersset.py b/castor/scheduler/diskmanager/runningtransfersset.py index 218c0493d1ee8587a4d8dbb4a7632f1d729e68ab..b2fbe8d4d09e0d21d44d23312dfc5927c79c4ebc 100644 --- a/castor/scheduler/diskmanager/runningtransfersset.py +++ b/castor/scheduler/diskmanager/runningtransfersset.py @@ -28,14 +28,14 @@ '''runningtransferset module of the CASTOR disk server manager. Handle a set of running transfers on a given diskserver''' -import os, socket +import os, socket, subprocess import threading import time import dlf from diskmanagerdlf import msgs import castor_tools import connectionpool -from transfer import cmdLineToTransfer, cmdLineToTransferId, TransferType, TapeTransfer, TapeTransferType +from transfer import cmdLineToTransfer, cmdLineToTransferId, TransferType, TapeTransferType from reporter import StreamCount class RunningTransfersSet(object): @@ -151,7 +151,7 @@ class RunningTransfersSet(object): # kill what can be killed toBeKilled = set(rTransfer for rTransfer in self.transfers if rTransfer.transfer.transferId in transferIds) for rTransfer in toBeKilled: - if rTransfer.process != None: + if type(rTransfer.process) == subprocess.Popen: rTransfer.process.terminate() # cleanup list of running transfers self.transfers = set(rTransfer for rTransfer in self.transfers if rTransfer.transfer.transferId not in transferIds) @@ -291,6 +291,51 @@ class RunningTransfersSet(object): self.tapelock.release() return n + def _isTransferOver(self, rTransfer): + '''checks if the given running transfer is still alive, and returns its return code''' + transferId = rTransfer.transfer.transferId + # check whether the transfer is over + isEnded = False + rc = None + if transferId in self.leftOverTransfers: + # special care for left over transfers, we use a signal 0 for them as they are not our children + try: + pid = self.leftOverTransfers[transferId] + os.kill(pid, 0) + # a process with this pid exists, now is it really our guy or something new ? + cmdline = open(os.path.sep+os.path.join('proc', str(pid), 'cmdline'), 'rb').read().split('\0') + if transferId != cmdLineToTransferId(cmdline): + # it's a new one, not our guy + isEnded = True + except OSError: + # process is dead + isEnded = True + if isEnded: + rc = -1 + del self.leftOverTransfers[transferId] + elif rTransfer.process != None: + # check regular transfers (non left over), except for d2dsrc transfers or transfers + # that have just been scheduled and not yet started as they have no process + try: + rc = rTransfer.process.poll() + isEnded = (rc != None) + except AttributeError: + # this is a running (xrootd) transfer for which we have no process associated, so poll() fails: + # in this case we must assume the transfer is still running + isEnded = False + else: + if self.fake: + isEnded = True + else: + # running transfers without a process are the ones for which a slot has been allocated + # but the client has not come yet. They must be timed out to avoid a slot leak - this would + # be the case of xrootd clients in particular, due to the stalling mechanism, but also of + # any client that does not reconnect after the call back. + if rTransfer.transfer.transferType == TransferType.STD and \ + rTransfer.startTime + rTransfer.transfer.getTimeout() < time.time(): + isEnded = True + return isEnded, rc + def poll(self): '''checks for finished transfers and clean them up''' killedTransfers = {} @@ -301,48 +346,7 @@ class RunningTransfersSet(object): d2dEnded = {} for rTransfer in self.transfers: transferId = rTransfer.transfer.transferId - # check whether the transfer is over - isEnded = False - if transferId in self.leftOverTransfers: - # special care for left over transfers, we use a signal 0 for them as they are not our children - try: - pid = self.leftOverTransfers[transferId] - os.kill(pid, 0) - # a process with this pid exists, now is it really our guy or something new ? - cmdline = open(os.path.sep+os.path.join('proc', str(pid), 'cmdline'), 'rb').read().split('\0') - if transferId != cmdLineToTransferId(cmdline): - # it's a new one, not our guy - isEnded = True - except OSError: - # process is dead - isEnded = True - if isEnded: - rc = -1 - del self.leftOverTransfers[transferId] - elif rTransfer.process != None: - # check regular transfers (non left over), except for d2dsrc transfers or transfers - # that have just been scheduled and not yet started as they have no process - try: - rc = rTransfer.process.poll() - isEnded = (rc != None) - except AttributeError: - # this is a running (xrootd) transfer for which we have no process associated, so poll() fails: - # in this case we must assume the transfer is still running - isEnded = False - else: - if self.fake: - isEnded = True - else: - # running transfers without a process are the ones for which a slot has been allocated - # but the client has not come yet. They must be timed out to avoid a slot leak - this would - # be the case of xrootd clients in particular, due to the stalling mechanism, but also of - # any client that does not reconnect after the call back. - if rTransfer.startTime + rTransfer.transfer.getTimeout() < time.time(): - failedTransfers.append((rTransfer.scheduler, rTransfer.transfer, \ - 1004, 'Timed out waiting for client connection')) # SETIMEDOUT - ended.append(transferId) - isEnded = True - rc = None # the transfer didn't even take place + isEnded, rc = self._isTransferOver(rTransfer) if isEnded: # "transfer ended" message dlf.writedebug(msgs.TRANSFERENDED, subreqId=transferId, reqId=rTransfer.transfer.reqId, @@ -368,6 +372,10 @@ class RunningTransfersSet(object): rc = 2 # ENOENT d2dEnded[rTransfer.scheduler].append((rTransfer.transfer, rTransfer.localPath, replicaFileSize, rc, errMsg)) + elif rc == None: + # the transfer didn't take place + failedTransfers.append((rTransfer.scheduler, rTransfer.transfer, \ + 1004, 'Timed out waiting for client connection')) # SETIMEDOUT elif rc < 0: # in case of transfers killed by a signal, remember to inform the DB if rTransfer.scheduler not in killedTransfers: killedTransfers[rTransfer.scheduler] = [] @@ -376,7 +384,8 @@ class RunningTransfersSet(object): rTransfer.transfer.fileId, rc, errMsg, rTransfer.transfer.reqId)) elif rc > 0: # these are transfers that got interrupted or somehow failed - failedTransfers.append((rTransfer.scheduler, rTransfer.transfer, 1015, 'Mover exited with failure, rc=%d' % rc)) # SEINTERNAL + failedTransfers.append((rTransfer.scheduler, rTransfer.transfer, + 1015, 'Mover exited with failure, rc=%d' % rc)) # SEINTERNAL # cleanup ended transfers self.transfers = set(rTransfer for rTransfer in self.transfers if rTransfer.transfer.transferId not in ended) finally: