Commit fe9c6c70 authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Added support for an OPEN call from movers, targeting xrootd

parent c7e3ac8d
......@@ -202,7 +202,7 @@ class ActivityControlThread(threading.Thread):
else:
# xroot is special here: we don't create a socket for the mover, but instead we tell
# the xroot server (through the redirector) to use our mover handler port for telling
# us when the file was effectively opened and closed
# us when the file is effectively opened and closed
moverPort = self.configuration.getValue('DiskManager', 'MoverHandlerPort', 15511)
# and on top, we have to tell the physical destination as the path in the response
destPath = transfer.destDcPath
......
......@@ -26,7 +26,7 @@
"""clients listener thread of the disk server manager daemon of CASTOR."""
import threading, socket, select, time, random
import connectionpool, dlf
import dlf
from diskmanagerdlf import msgs
......@@ -56,6 +56,7 @@ class ClientsListenerThread(threading.Thread):
self.config = config
# an fd -> MoverSocket dictionary for keeping track of the outstanding movers
self.outstandingMovers = {}
# a multi-poll structure to look for incoming connections from clients
self.clientsPoll = select.poll()
# start the thread
self.setDaemon(True)
......@@ -114,12 +115,10 @@ class ClientsListenerThread(threading.Thread):
# check timeout for all outstanding movers
for moverSock in self.outstandingMovers.values():
if moverSock.expirationTime < time.time():
# the client for this mover did not come on time, fail the transfer
self.runningTransfers.failTransfer(moverSock.qTransfer.scheduler, moverSock.qTransfer.transfer, 1004, # SETIMEDOUT
'Timed out waiting for client connection', removeFromRunningSet=True)
# and drop it from our structures
# the client for this mover did not come on time, drop it from our structures
self.clientsPoll.unregister(moverSock.socket.fileno())
del self.outstandingMovers[moverSock.socket.fileno()]
# the transfer will be failed by the manager thread polling all running transfers
except Exception, e:
# "Caught exception in ClientsListener thread" message
dlf.writeerr(msgs.CLIENTSLISTENEREXCEPTION, Type=str(e.__class__), Message=str(e))
......
......@@ -25,7 +25,7 @@
"""mover handler thread of the disk server manager daemon of CASTOR."""
import threading, sys, socket, time
import threading, socket, time
import connectionpool, dlf
from diskmanagerdlf import msgs
......@@ -46,6 +46,19 @@ class MoverHandlerThread(threading.Thread):
self.setDaemon(True)
self.start()
def handleOpen(self, transferid):
'''handle an OPEN call'''
try:
t = self.runningTransfers.get(transferid)
# found it, update its process. Anything not None is OK here, see runningtransferset.py
t.process = 0
return "0"
except KeyError:
# transfer not found: typically it already timed out, so we log this
# "Transfer slot timed out" message
dlf.writenotice(msgs.TRANSFERTIMEDOUT, subreqId=transferid)
raise
def handleClose(self, payload):
'''handle a CLOSE call'''
transferid, fSize, cksumType, cksumValue, errCode = payload.split()[0:5]
......@@ -78,8 +91,6 @@ class MoverHandlerThread(threading.Thread):
if rc != 0:
# log "Failed to end the transfer"
dlf.writenotice(msgs.TRANSFERENDEDFAILED, subreqId=transferid, reqId=t.transfer.reqId, errCode=rc, errMessage=errMsg)
# return result to the mover
dlf.writedebug(msgs.TRANSFERENDED, returnCode=rc, returnMessage=errMsg)
return '%d %s\n' % (rc, errMsg)
except connectionpool.Timeout, e:
# as long as we get a timeout, we retry up to 3 times
......@@ -110,7 +121,9 @@ class MoverHandlerThread(threading.Thread):
# parse input, bail out on any parsing error
try:
key, payload = data.split(' ', 1)
if key == 'CLOSE':
if key == 'OPEN':
return self.handleOpen(payload)
elif key == 'CLOSE':
return self.handleClose(payload)
else:
raise ValueError
......
......@@ -376,11 +376,27 @@ class RunningTransfersSet(object):
elif rTransfer.process != None:
# check regular transfers (non left over), except for d2dsrc transfers or transfers
# that have just been scheduled and not yet started as they have no process
try:
rc = rTransfer.process.poll()
isEnded = (rc != None)
except AttributeError:
# this is a running (xrootd) transfer for which we have no process associated, so poll() fails:
# in this case we must assume the transfer is still running
isEnded = False
else:
if self.fake:
isEnded = True
else:
rc = rTransfer.process.poll()
isEnded = (rc != None)
# running transfers without a process are the ones for which a slot has been allocated
# but the client has not come yet. They must be timed out to avoid a slot leak - this would
# be the case of xrootd clients in particular, due to the stalling mechanism, but also of
# any client that does not reconnect after the call back.
if rTransfer.startTime + rTransfer.transfer.getTimeout() < time.time():
failedTransfers.append((rTransfer.scheduler, rTransfer.transfer, \
1004, 'Timed out waiting for client connection')) # SETIMEDOUT
ended.append(transferId)
isEnded = True
rc = None # the transfer didn't even take place
if isEnded:
# "transfer ended" message
dlf.writedebug(msgs.TRANSFERENDED, subreqId=transferId, reqId=rTransfer.transfer.reqId,
......@@ -467,7 +483,7 @@ class RunningTransfersSet(object):
for scheduler, transfer, rc, msg in failedTransfers:
try:
# try to fail the transfer on our side
self.failTransfer(scheduler, transfer, rc, msg, removeFromRunningSet=False) # SEINTERNAL
self.failTransfer(scheduler, transfer, rc, msg) # SEINTERNAL
except Exception, e:
# "Failed to end the transfer" message
dlf.writeerr(msgs.TRANSFERENDEDFAILED, type=str(e.__class__), message=str(e), originalErrorMessage=msg)
......@@ -532,18 +548,12 @@ class RunningTransfersSet(object):
finally:
self.lock.release()
def failTransfer(self, scheduler, transfer, errCode, errMessage, removeFromRunningSet):
'''fail a transfer with the given error code and message'''
def failTransfer(self, scheduler, transfer, errCode, errMessage):
'''Fail a transfer with the given error code and message. The transfer is assumed to have already been dropped
from the list of running transfers, and thus no lock is taken for this operation.'''
# get the admin timeout
timeout = self.config.getValue('TransferManager', 'AdminTimeout', 5, float)
closeTime = time.time()
if removeFromRunningSet:
try:
# first take the transfer out of the running set
self.lock.acquire()
self.transfers = set(rTransfer for rTransfer in self.transfers if rTransfer.transfer.transferId != transfer.transferId)
finally:
self.lock.release()
# call transferEnded to fail the transfer
attempt = 0
while True:
......
......@@ -306,7 +306,7 @@ class Transfer(BaseTransfer):
t = Transfer.configuration.getValue('RFIOD', 'CONNTIMEOUT', 10)
elif self.protocol == 'gsiftp':
t = Transfer.configuration.getValue('GSIFTP', 'TIMEOUT', 180)
elif self.protocol == 'xrootd':
elif self.protocol == 'xroot':
t = Transfer.configuration.getValue('XROOT', 'TIMEOUT', 300)
else:
raise TypeError('No valid timeout value for protocol %s' % self.protocol)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment