Commit 3b6e794a authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Minor improvements + added proper documentation

parent 150519ad
......@@ -42,8 +42,7 @@ class MoverSocket(object):
class ClientsListenerThread(threading.Thread):
'''the clients listener thread.
This thread is responsible for listening to clients callbacks, so that movers
can be executed in inetd mode only when the client has already connected. In case of
timeout, a call to transferEnded is executed to fail the transfer.
can be executed in inetd mode only when the client has already connected.
During the listening time, the transfer slot is already reserved (cf. activitycontrol),
so that when the client arrives, the transfer starts immediately.
'''
......
......@@ -48,7 +48,7 @@ msgs = dlf.enum('INVOKINGSCHEDULETRANSFER', 'INVOKINGSUMMARIZETRANSFERS',
'SCHEDFROMBACKFILL', 'PUTJOBINBACKFILL', 'SCHEDUSERJOBUNDERPRESSURE',
'SCHEDPRIORITY', 'AVOIDBACKFILLSTARV', 'SCHEDUSERJOB',
'MOVERHANDLEREXCEPTION', 'CLIENTSREPLIEREXCEPTION', 'CLIENTSLISTENEREXCEPTION',
'MOVERSTARTING', 'MOVERSTARTFAILED', 'TRANSFERTIMEDOUT')
'MOVERSTARTING', 'MOVERSTARTFAILED', 'MOVERCALL', 'TRANSFERTIMEDOUT')
# initialization of the messages
dlf.addmessages({msgs.INVOKINGSCHEDULETRANSFER : 'Invoking scheduleTransfer',
......@@ -104,5 +104,6 @@ dlf.addmessages({msgs.INVOKINGSCHEDULETRANSFER : 'Invoking scheduleTransfer',
msgs.CLIENTSLISTENEREXCEPTION : 'Caught exception in ClientsListener thread',
msgs.MOVERSTARTING : 'Executing mover',
msgs.MOVERSTARTFAILED : 'Failed to execute mover',
msgs.MOVERCALL : 'Received call from mover',
msgs.TRANSFERTIMEDOUT : 'Transfer slot timed out'})
......@@ -32,7 +32,7 @@ from diskmanagerdlf import msgs
class MoverHandlerThread(threading.Thread):
'''the mover handler thread.
This thread is responsible for handling the close of the files as requested by the different movers.
This thread is responsible for handling the open and close of the files as requested by the different movers.
It is not multithreaded, it simply dispatches any connection to the handleRequest method.
'''
......@@ -46,12 +46,23 @@ class MoverHandlerThread(threading.Thread):
self.setDaemon(True)
self.start()
def handleOpen(self, transferid):
'''handle an OPEN call'''
def handleOpen(self, payload):
'''handle an OPEN call. The protocol is as follows:
- The mover connects to localhost:15511 [DiskManager/MoverHandlerPort in castor.conf]
- It sends a single string like
OPEN <transferUUID>
- It synchronously waits for a single answer like
<rc>[ <error message>]\n
either: "0" for success
or: "2 Transfer has disappeared from the scheduling system"
in case the slot is not available any longer
'''
try:
t = self.runningTransfers.get(transferid)
# found it, update its process. Anything not None is OK here, see runningtransferset.py
t.process = 0
# parse payload, ignore anything after first string
transferid = payload.split()[0]
# Look up for this transferid and update its process.
# Anything not None is OK here, see runningtransferset.py
self.runningTransfers.setProcess(transferid, 0)
return "0"
except KeyError:
# transfer not found: typically it already timed out, so we log this
......@@ -60,7 +71,15 @@ class MoverHandlerThread(threading.Thread):
raise
def handleClose(self, payload):
'''handle a CLOSE call'''
'''handle a CLOSE call. The protocol is as follows:
- The mover connects to localhost:15511 [DiskManager/MoverHandlerPort in castor.conf]
- It sends a single string like
CLOSE <transferUUID> <fileSize> <cksumType> <cksumValue> <errorCode>[ <error message>]
[In case of Get requests, fileSize, cksumType, and cksumValue are ignored]
- It synchronously waits for a single answer like
<rc>[ <error message>]\n
'''
# parse payload, throw IndexError on malformed input
transferid, fSize, cksumType, cksumValue, errCode = payload.split()[0:5]
if errCode != '0':
errMessage = payload[payload.find(' '+ errCode +' ')+1:].split(' ', 1)[1] # the error message is any string at the end
......@@ -108,19 +127,14 @@ class MoverHandlerThread(threading.Thread):
return '%d %s\n' % (1015, 'Error closing the file in the stager')
def handleRequest(self, data):
'''the requests handler.
Only closing files is supported as of now, with the following protocol:
- The mover connects to localhost:15511 [DiskManager/MoverHandlerPort in castor.conf]
- It sends a single string like
CLOSE <transferUUID> <fileSize> <cksumType> <cksumValue> <errorCode>[ <error message>]
[In case of Get requests, fileSize, cksumType, and cksumValue are ignored]
- It synchronously waits for a single answer like
<rc>[ <error message>]\n
'''
The requests handler for the mover open/close protocol.
As this is a local protocol, there's no magic number protection
'''
# parse input, bail out on any parsing error
try:
key, payload = data.split(' ', 1)
dlf.writedebug(msgs.MOVERCALL, operation=key, payload=payload)
if key == 'OPEN':
return self.handleOpen(payload)
elif key == 'CLOSE':
......@@ -150,11 +164,11 @@ class MoverHandlerThread(threading.Thread):
sock.bind(('localhost', self.config.getValue('DiskManager', 'MoverHandlerPort', 15511)))
sock.listen(5)
except Exception, e:
# "Caught exception in CloseHandler thread" message
dlf.writeemerg(msgs.CLOSEHANDLEREXCEPTION, \
# "Caught exception in MoverHandler thread" message
dlf.writeemerg(msgs.MOVERHANDLEREXCEPTION, \
error='Could not bind to the mover handler port: %s. Terminating' % str(e), \
port=self.config.getValue('DiskManager', 'MoverHandlerPort', 15511))
# this is fatal as no mover can close any file, therefore exit
# this is fatal as no mover would be able to close any file, therefore exit
raise SystemExit
while self.alive:
......
......@@ -39,7 +39,8 @@ from transfer import cmdLineToTransfer, cmdLineToTransferId, TransferType, TapeT
from reporter import StreamCount
class RunningTransfersSet(object):
'''handles a list of running transfers and is able to poll them regularly and list the ones that ended'''
'''handles a list of running transfers and is able to poll them regularly and list the ones that ended.
Moreover, in case of a timeout transferEnded is called to fail the transfer.'''
def __init__(self, fake=False):
'''constructor'''
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment