diff --git a/castor/scheduler/diskmanager/moverhandler.py b/castor/scheduler/diskmanager/moverhandler.py index 694a8eba86693cbb0451f5ce4ac638e0e7df10c3..65b598be596318fbfd0f944800cd3ed1eecb8c11 100644 --- a/castor/scheduler/diskmanager/moverhandler.py +++ b/castor/scheduler/diskmanager/moverhandler.py @@ -28,7 +28,7 @@ import threading, socket, time, Queue, os, ast import connectionpool, dlf from diskmanagerdlf import msgs -from transfer import TapeTransfer, TapeTransferType, RunningTransfer +from transfer import TransferType, TapeTransfer, TapeTransferType, D2DTransferType class MoverReqHandlerThread(threading.Thread): '''Worker thread handling each mover request''' @@ -85,7 +85,7 @@ class MoverReqHandlerThread(threading.Thread): self.runningTransfers.remove(t) self.runningTransfers.failTransfer(t.scheduler, t.transfer, errCode, 'Error while opening the file') else: - # Anything not None is OK here, see runningtransferset.py + # No xroot error and we found it: set process to something not None, see runningtransferset.py self.runningTransfers.setProcess(transferid, 0) elif transferType == 'tape': # this is a tape transfer: take note @@ -134,13 +134,15 @@ class MoverReqHandlerThread(threading.Thread): closeTime = time.time() # call transferEnded with the given arguments attempt = 0 - try: - while True: - try: - # log "Transfer ended" - dlf.write(msgs.TRANSFERENDED, subreqId=transferid, reqId=t.transfer.reqId, fileId=t.transfer.fileId, - flags=t.transfer.flags, fileSize=fSize, errCode=errCode, errMessage=errMessage, \ - totalTime="%.6f" % (closeTime-t.transfer.creationTime), schedulerTime="%.6f" % (closeTime-t.transfer.submissionTime)) + while True: + try: + # log "Transfer ended" + dlf.write(msgs.TRANSFERENDED, subreqId=transferid, reqId=t.transfer.reqId, fileId=t.transfer.fileId, \ + transferType=(t.transfer.flags if t.transfer.transferType == TransferType.STD else D2DTransferType.toStr(t.transfer.replicationType)), + fileSize=fSize, errCode=errCode, errMessage=errMessage, \ + totalTime="%.6f" % (closeTime-t.transfer.creationTime), schedulerTime="%.6f" % (closeTime-t.transfer.submissionTime)) + if t.transfer.transferType == TransferType.STD: + # user transfer rc, errMsg = connectionpool.connections.transferEnded(t.scheduler, \ (transferid, t.transfer.reqId, t.transfer.fileId, t.transfer.flags, \ int(fSize), closeTime, cksumType, cksumValue, int(errCode), errMessage), \ @@ -151,21 +153,34 @@ class MoverReqHandlerThread(threading.Thread): else: dlf.writedebug(msgs.TRANSFERENDED, subreqId=transferid, errCode=0) return '%d %s\n' % (rc, errMsg) - except connectionpool.Timeout, e: - # as long as we get a timeout, we retry up to 3 times - attempt += 1 - if attempt < 3: - dlf.writedebug(msgs.TRANSFERENDED, subreqId=transferid, errMessage='Timeout, sleeping %d seconds' % attempt) - time.sleep(attempt) - else: - # give up, inform mover - raise Exception('Timeout attempting to end transfer %s in scheduler %s' % (transferid, t.scheduler)) - except Exception, e: - # any other error, we give up and inform the mover - # "Caught exception in MoverHandler thread" message, error = SEINTERNAL - dlf.writeerr(msgs.MOVERHANDLEREXCEPTION, Type=str(e.__class__), Message=str(e)) - # report error to the mover - return '%d %s\n' % (1015, 'Error closing the file in the stager') + elif t.transfer.transferType == TransferType.D2DDST: + # a destination disk-to-disk copy needs to be notified to the scheduler, with no return code + connectionpool.connections.d2dEnded(t.scheduler, + tuple([(transferid, t.transfer.reqId, t.transfer.fileId, + socket.getfqdn(), t.localPath, int(fSize), errCode, errMessage)]), + timeout=timeout) + return '0\n' + elif t.transfer.transferType == TransferType.D2DSRC: + # a source disk-to-disk copy does not need to be notified + return '0\n' + else: + raise ValueError('Invalid transfer type %d for transfer %s' % (t.transfer.transferType, transferid)) + except connectionpool.Timeout, e: + # as long as we get a timeout, we retry up to 3 times + attempt += 1 + if attempt < 3: + dlf.writedebug(msgs.TRANSFERENDED, subreqId=transferid, errMessage='Timeout, sleeping %d seconds' % attempt) + time.sleep(attempt) + else: + # give up, inform mover + dlf.writeerr(msgs.MOVERHANDLEREXCEPTION, Message='Timeout attempting to end transfer %s in scheduler %s' % (transferid, t.scheduler)) + return '%d %s\n' % (1015, 'Error closing the file in the stager') + except Exception, e: + # any other error, we give up and inform the mover + # "Caught exception in MoverHandler thread" message, error = SEINTERNAL + dlf.writeerr(msgs.MOVERHANDLEREXCEPTION, Type=str(e.__class__), Message=str(e)) + # report error to the mover + return '%d %s\n' % (1015, 'Error closing the file in the stager') def handleRequest(self, data): '''