dispatcher.py 24.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#/******************************************************************************
# *                   dispatcher.py
# *
# * This file is part of the Castor project.
# * See http://castor.web.cern.ch/castor
# *
# * Copyright (C) 2003  CERN
# * This program is free software; you can redistribute it and/or
# * modify it under the terms of the GNU General Public License
# * as published by the Free Software Foundation; either version 2
# * of the License, or (at your option) any later version.
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# *
# *
21
22
# * dispatcher class of the transfer manager of the CASTOR project
# * this class is responsible for polling the DB for transfers to dispatch and
23
24
25
26
27
# * effectively dispatch them on the relevant diskservers.
# *
# * @author Castor Dev team, castor-dev@cern.ch
# *****************************************************************************/

28
29
30
31
'''dispatcher module of the transfer manager daemon.
Handles the polling from the stager DB for new request
and their dispatching to the proper diskservers'''

32
import time
33
34
35
import threading
import cx_Oracle
import socket
36
import castor_tools, connectionpool
37
import Queue
38
import dlf
39
from transfermanagerdlf import msgs
40
41
from transfer import D2DTransfer, Transfer, TransferType
import copy
42

43
class WorkerThread(threading.Thread):
44
  '''Worker thread, responsible for scheduling effectively the transfers on the diskservers'''
45
46
47

  def __init__(self, workqueue):
    '''constructor'''
48
    super(WorkerThread, self).__init__(name='Worker')
49
50
51
52
53
    # the queue to work with
    self.workqueue = workqueue
    # whether to continue running
    self.running = True
    # start the thread
54
    self.setDaemon(True)
55
56
57
58
59
    self.start()

  def stop(self):
    '''Stops the thread processing'''
    self.running = False
60

61
62
63
64
  def run(self):
    '''main method to the threads. Only get work from the queue and do it'''
    while self.running:
      try:
65
66
67
68
        func, args = self.workqueue.get(True)
        # func may be None in case we wanted to exit the blocking get in order to close the service
        if func:
          func(*args)
Sebastien Ponce's avatar
Sebastien Ponce committed
69
70
71
72
      except Queue.Empty:
        # we've timed out, let's just retry. We only use the timeout so that this
        # thread can stop even if there is nothing in the queue
        pass
73
      except Exception, e:
74
        # "Caught exception in Worker thread" message
75
        dlf.writeerr(msgs.WORKEREXCEPTION, Type=str(e.__class__), Message=str(e))
76

77
class DBUpdaterThread(threading.Thread):
78
  '''Worker thread, responsible for updating DB asynchronously and in bulk after the transfer scheduling'''
79
80
81

  def __init__(self, workqueue):
    '''constructor'''
82
    super(DBUpdaterThread, self).__init__(name='DBUpdater')
83
84
85
86
87
88
89
    # whether we are connected to the stager DB
    self.stagerConnection = None
    # the queue to work with
    self.workqueue = workqueue
    # whether to continue running
    self.running = True
    # start the thread
90
    self.setDaemon(True)
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
    self.start()

  def stop(self):
    '''Stops the thread processing'''
    self.running = False

  def dbConnection(self):
    '''returns a connection to the stager DB.
    The connection is cached and reconnections are handled'''
    if self.stagerConnection == None:
      self.stagerConnection = castor_tools.connectToStager()
      self.stagerConnection.autocommit = True
    return self.stagerConnection

  def run(self):
    '''main method to the threads. Only get work from the queue and do it'''
107
108
109
110
    try:
      while self.running:
        # get something from the queue and then empty the queue and list all the updates to be done in one bulk
        failures = []
111
112
        # check whether there is something to do: we don't use timeouts because they cause spin locks,
        # thus we rely on the stopper to push a None entry (cf. Dispatcher.join)...
113
        transferId, fileId, errcode, errmsg, reqid = self.workqueue.get(True)
114
        # ...in case we wanted to exit the blocking get in order to close the service
115
116
        if transferId:
          failures.append((transferId, fileId, errcode, errmsg, reqid))
117
118
        else:
          # we got None, we're about to exit
119
120
          continue
        # empty the queue so that we go only once to the DB
121
        try:
122
          while True:
123
            transferId, fileId, errcode, errmsg, reqid = self.workqueue.get(False)
124
            # skip None, this is a fake message because we are about to exit, so don't process
125
126
            if transferId:
              failures.append((transferId, fileId, errcode, errmsg, reqid))
127
128
129
130
        except Queue.Empty:
          # we are over, the queue is empty
          pass
        # Now call the DB for failures
131
        data = zip(*failures)
132
        transferIds = data[0]
133
134
135
136
        errcodes = data[2]
        errmsgs = data[3]
        try:
          stcur = self.dbConnection().cursor()
137
          try:
138
139
            stcur.execute("BEGIN transferFailedLockedFile(:1, :2, :3); END;", [list(transferIds), list(errcodes), list(errmsgs)])
            for transferId, fileId, errcode, errmsg, reqid in failures:
140
              # 'Failed transfer' message
141
              dlf.writenotice(msgs.FAILEDTRANSFER, subreqid=transferId, reqid=reqid, fileId=fileId, ErrorCode=errcode, ErrorMessage=errmsg)
142
143
144
          finally:
            stcur.close()
        except Exception, e:
145
          for transferId, fileId, errcode, errmsg, reqid in failures:
146
            # 'Exception caught while failing transfer' message
147
            dlf.writeerr(msgs.FAILINGTRANSFEREXCEPTION, subreqid=transferId, reqid=reqid, fileId=fileId, Type=str(e.__class__), Message=str(e))
148
149
          # check whether we should reconnect to DB, and do so if needed
          self.dbConnection().checkForReconnection(e)
150
    finally:
151
152
153
154
155
      if self.stagerConnection != None:
        try:
          castor_tools.disconnectDB(self.stagerConnection)
        except Exception:
          pass
156

157
158
159
160
161
def inttoip(n):
  '''converts a integer into human readble IP address'''
  if n < 0:
    n = (1<<32) + n
  return socket.inet_ntoa(hex(n)[2:].zfill(8).decode('hex'))
162

163
164
165
166
167
class AbstractDispatcherThread(threading.Thread):
  '''abstract version of a scheduling thread.
     Contains all the threading code, the db connection code and the scheduling code.
     Implementations should only implement the run method to connect to the stager,
     get transfers and call the _schedule method.'''
168

169
  def __init__(self, queueingTransfers, name, nbWorkers=5):
170
    '''constructor of this thread. Arguments are the connection pool and the transfer queue to use'''
171
    super(AbstractDispatcherThread, self).__init__(name=name)
172
    # whether we should continue running
173
    self.running = True
174
175
    # the list of queueing transfers
    self.queueingTransfers = queueingTransfers
176
    # our own name
177
    self.hostname = socket.getfqdn()
178
179
    # a counter of number of scheduled transfers in the current second
    self.nbTransfersScheduled = 0
180
181
    # the current second, so that we can reset the previous counter when it changes
    self.currentSecond = 0
182
    # whether we are connected to the stager DB
183
    self.stagerConnection = None
184
    # a queue of work to be done by the workers
185
    self.workToDispatch = Queue.Queue(2*nbWorkers)
186
187
188
189
    # a queue of updates to be done in the DB
    self.updateDBQueue = Queue.Queue()
    # a thread pool of Schedulers
    self.workers = []
190
    for i in range(nbWorkers):          # pylint: disable=W0612
191
      t = WorkerThread(self.workToDispatch)
192
193
      self.workers.append(t)
    # a DBUpdater thread
194
195
196
197
    self.dbthread = DBUpdaterThread(self.updateDBQueue)
    # start the thread
    self.setDaemon(True)
    self.start()
198

199
  def join(self, timeout=None):
200
201
202
203
    # put None values to the worker queue so that workers go out of their blocking call to get
    for i_unused in range(len(self.workers)):
      self.workToDispatch.put((None, None))
    # join the worker threads
204
205
    for w in self.workers:
      w.join(timeout)
206
    # put None values to the updateDBQueue so that dbthread goes out of its blocking call to get
207
    self.updateDBQueue.put((None, (None, None), None, None, None))
208
    # join the db thread
209
    self.dbthread.join(timeout)
210
    # join the master thread
211
    threading.Thread.join(self, timeout)
Sebastien Ponce's avatar
Sebastien Ponce committed
212

213
214
215
216
  def dbConnection(self):
    '''returns a connection to the stager DB.
    The connection is cached and reconnections are handled'''
    if self.stagerConnection == None:
217
218
      self.stagerConnection = castor_tools.connectToStager()
      self.stagerConnection.autocommit = True
219
220
    return self.stagerConnection

221
  def _schedule(self, transferId, reqId, fileId, transferList, transferType, errorCode, errorMessage):
222
223
224
225
226
227
228
229
    '''schedules a given transfer on the given set of machine and handles errors.
    Returns whether the scheduling was successful'''
    # put transfers in the queue of pending transfers
    # Note that we have to do this before even attempting to schedule the
    # transfers for real, as a job may start very fast after the scheduling
    # on the first machine, and it expects the queue to be up to date.
    # the consequence is that we may have to amend the list in case we
    # could not schedule everywhere.
230
231
    for transfer in transferList:
      self.queueingTransfers.put(transfer)
232
233
234
235
236
237
238
    # send the transfers to the appropriate diskservers
    # not that we will retry up to 3 times if we do not manage to schedule anywhere
    # we then give up
    nbRetries = 0
    scheduleHosts = []
    while not scheduleHosts and nbRetries < 3:
      nbRetries = nbRetries + 1
239
      for transfer in transferList:
240
        try:
241
242
          connectionpool.connections.scheduleTransfer(transfer.diskServer, self.hostname, transfer.asTuple())
          scheduleHosts.append(transfer.diskServer)
243
244
        except Exception, e:
          # 'Failed to schedule xxx' message
245
246
          dlf.writenotice(errorCode, subreqid=transferId, reqid=reqId, fileId=fileId,
                          DiskServer=transfer.diskServer, Type=str(e.__class__), Message=str(e))
247
248
249
    # we are over, check whether we could schedule at all
    if not scheduleHosts:
      # we could not schedule anywhere.... so fail the transfer in the DB
250
      self.updateDBQueue.put((transferId, fileId, 1721, errorMessage, reqId)) # 1721 = ESTSCHEDERR
251
      # and remove it from the server queue
252
      self.queueingTransfers.remove([(transferId, transferType)])
253
254
255
      return False
    else:
      # see where we could not schedule
256
      failedHosts = set([transfer.diskServer for transfer in transferList]) - set(scheduleHosts)
257
258
259
      # We could scheduler at least on one host
      if failedHosts:
        # but we have failed on others : inform server queue of the failures
260
261
        if self.queueingTransfers.transfersStartingFailed([transfer for transfer in transferList
                                                           if transfer.diskServer in failedHosts]):
262
263
264
265
266
267
          # It seems that finally we have not been able to schedule anywhere....
          # This may seem in contradiction with the last but one comment but it actually
          # only means that the machines to which we've managed to schedule have already
          # tried to start the job in the mean time and have all failed, e.g because
          # they have no space.
          # So in practice, we will not start the job and we have to inform the DB
268
          self.updateDBQueue.put((transferId, fileId, 1721, errorMessage, reqId)) # 1721 = ESTSCHEDERR
269
270
          return False
      # 'Marking transfer as scheduled' message
271
      dlf.write(msgs.TRANSFERSCHEDULED, subreqid=transferId, reqid=reqId, fileId=fileId, type=transferType, hosts=str(scheduleHosts))
272
273
      return True

274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
  def stop(self):
    '''Stops processing of this thread'''
    # first stop the acitivity of taking new jobs from the DB
    self.running = False
    # now wait that the internal queue is empty
    # note that this should be implemented with Queue.join and Queue.task_done if we would have python 2.5
    while not self.workToDispatch.empty():
      time.sleep(0.1)
    # then stop the workers
    for w in self.workers:
      w.running = False
    # and finally stop the DB thread
    self.dbthread.running = False

    
class UserDispatcherThread(AbstractDispatcherThread):
  '''Dispatcher thread dedicated to user transfers'''
  
  def __init__(self, queueingTransfers, nbWorkers=5):
    '''constructor of this thread. Arguments are the connection pool and the transfer queue to use'''
    super(UserDispatcherThread, self).__init__(queueingTransfers, 'UserDispatcherThread', nbWorkers)
295

296
  def _scheduleStandard(self, transfer, destFilesystems):
297
298
    '''Schedules a disk to disk copy and handle issues '''
    # extract list of candidates where to schedule and log
299
    if None == destFilesystems:
300
301
      schedCandidates = []
    else:
302
      schedCandidates = [candidate.split(':') for candidate in destFilesystems.split('|')]
303
    # 'Scheduling standard transfer' message
304
305
    dlf.writedebug(msgs.SCHEDTRANSFER, subreqid=transfer.transferId, reqid=transfer.reqId,
                   fileId=transfer.fileId, DiskServers=str(schedCandidates))
306
307
    # build a list of transfers to schedule for each machine
    transferList = []
308
309
310
311
312
    for diskServer, mountPoint in schedCandidates:
      dtransfer = copy.copy(transfer)
      dtransfer.diskServer = diskServer
      dtransfer.mountPoint = mountPoint
      transferList.append(dtransfer)
313
    # effectively schedule the transfer
314
    self._schedule(transfer.transferId, transfer.reqId, transfer.fileId, transferList, 'standard',
315
                   msgs.SCHEDTRANSFERFAILED, 'Unable to schedule transfer')
316
317

  def run(self):
318
    '''goes to the stager DB and retrieves user jobs to schedule'''
319
    configuration = castor_tools.castorConf()
320
    while self.running:
321
      try:
322
        # setup an oracle connection and register our interest for 'transferReadyToSchedule' alerts
323
        stcur = self.dbConnection().cursor()
324
        try:
325
326
327
328
329
330
331
          stcur.execute("BEGIN DBMS_ALERT.REGISTER('transferReadyToSchedule'); END;")
          # prepare a cursor for database polling
          stcur = self.dbConnection().cursor()
          stcur.arraysize = 50
          srId = stcur.var(cx_Oracle.NUMBER)
          srSubReqId = stcur.var(cx_Oracle.STRING)
          srProtocol = stcur.var(cx_Oracle.STRING)
332
          destFilesystems = stcur.var(cx_Oracle.STRING)
333
334
335
336
337
338
339
340
341
342
343
344
          reqId = stcur.var(cx_Oracle.STRING)
          cfFileId = stcur.var(cx_Oracle.NUMBER)
          cfNsHost = stcur.var(cx_Oracle.STRING)
          reqSvcClass = stcur.var(cx_Oracle.STRING)
          reqType = stcur.var(cx_Oracle.NUMBER)
          reqEuid = stcur.var(cx_Oracle.NUMBER)
          reqEgid = stcur.var(cx_Oracle.NUMBER)
          srOpenFlags = stcur.var(cx_Oracle.STRING)
          clientIp = stcur.var(cx_Oracle.NUMBER)
          clientPort = stcur.var(cx_Oracle.NUMBER)
          clientSecure = stcur.var(cx_Oracle.NUMBER)
          reqCreationTime = stcur.var(cx_Oracle.NUMBER)
345
          stTransferToSchedule = 'BEGIN userTransferToSchedule(:srId, :srSubReqId , :srProtocol, :destFilesystems, :reqId, :cfFileId, :cfNsHost, :reqSvcClass, :reqType, :reqEuid, :reqEgid, :srOpenFlags, :clientIp, :clientPort, :clientSecure, :reqCreationTime); END;' # pylint: disable=C0301
346
347
348
          # infinite loop over the polling of the DB
          while self.running:
            # see whether there is something to do
349
350
351
352
353
            # note that this will hang until something comes or the internal timeout is reached
            stcur.execute(stTransferToSchedule, (srId, srSubReqId, srProtocol, destFilesystems, reqId, cfFileId,
                                                 cfNsHost, reqSvcClass, reqType, reqEuid, reqEgid,
                                                 srOpenFlags, clientIp, clientPort, clientSecure,
                                                 reqCreationTime))
354
355
            # in case of timeout, we may have nothing to do
            if srId.getvalue() != None:
356
357
358
359
360
361
362
363
364
365
366
367
              # errors are handled internally and there are no exception others than
              # the ones implying the end of the processing
              self.workToDispatch.put((self._scheduleStandard,
                                       (Transfer(srSubReqId.getvalue(), reqId.getvalue(),
                                                 (cfNsHost.getvalue(), int(cfFileId.getvalue())),
                                                 int(reqEuid.getvalue()), int(reqEgid.getvalue()),
                                                 reqSvcClass.getvalue(), reqCreationTime.getvalue(),
                                                 srProtocol.getvalue(), srId.getvalue(),
                                                 int(reqType.getvalue()), srOpenFlags.getvalue(),
                                                 inttoip(int(clientIp.getvalue())), 
                                                 int(clientPort.getvalue()),int(clientSecure.getvalue())),
                                        destFilesystems.getvalue())))
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
              # if maxNbTransfersScheduledPerSecond is given, request throttling is active
              # What it does is keep a count of the number of scheduled request in the current second
              # and wait the rest of the second if it reached the limit
              maxNbTransfersScheduledPerSecond = configuration.getValue('TransferManager', 'MaxNbTransfersScheduledPerSecond', -1, int)
              if maxNbTransfersScheduledPerSecond >= 0:
                currentTime = time.time()
                currentSecond = int(currentTime)
                # reset the counters if we've changed second
                if currentSecond != self.currentSecond:
                  self.currentSecond = currentSecond
                  self.nbTransfersScheduled = 0
                # increase counter of number of transfers scheduled within the current second
                self.nbTransfersScheduled = self.nbTransfersScheduled + 1
                # did we reach our quota of requests for this second ?
                if self.nbTransfersScheduled >= maxNbTransfersScheduledPerSecond:
                  # check that the second is not just over, so that we do not sleep a negative time
                  if currentTime < self.currentSecond + 1:
                    # wait until the second is over
                    time.sleep(self.currentSecond + 1 - currentTime)
387
388
        finally:
          stcur.close()
389
      except Exception, e:
390
391
        # "Caught exception in Dispatcher for regular Job" message
        dlf.writeerr(msgs.DISPATCHJOBEXCEPTION, Type=str(e.__class__), Message=str(e))
392
393
        # check whether we should reconnect to DB, and do so if needed
        self.dbConnection().checkForReconnection(e)
394
395
        # then sleep a bit to not loop to fast on the error
        time.sleep(1)
396
397
398
399
400
401
402
    
class D2DDispatcherThread(AbstractDispatcherThread):
  '''Dispatcher thread dedicated to disk to disk transfers'''
  
  def __init__(self, queueingTransfers, nbWorkers=5):
    '''constructor of this thread. Arguments are the connection pool and the transfer queue to use'''
    super(D2DDispatcherThread, self).__init__(queueingTransfers, 'D2DDispatcherThread', nbWorkers)
403

404
405
  def _scheduleD2d(self, srcTransfer, sourceFileSystems, destFilesystems):
    '''Schedules a disk to disk copy on the source and destinations and handle issues '''
406
407
408
409
410
411
    # check whether the sources are not empty
    if sourceFileSystems == None:
      # fail the transfer immediately as we have nowhere to go. This will log the error too
      self.updateDBQueue.put((srcTransfer.transferId, srcTransfer.fileId, 1721,   # 1721 = ESTSCHEDERR
                              'No source found', srcTransfer.reqId))
      return
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
    # check whether the destinations are not empty
    if destFilesystems == None:
      # fail the transfer immediately as we have nowhere to go. This will log the error too
      self.updateDBQueue.put((srcTransfer.transferId, srcTransfer.fileId, 1721,   # 1721 = ESTSCHEDERR
                              'No destination host found', srcTransfer.reqId))
      return
    # first schedule sources transfers
    schedSourceCandidates = [candidate.split(':') for candidate in sourceFileSystems.split('|')]
    # 'Scheduling d2d source' message
    dlf.writedebug(msgs.SCHEDD2DSRC, subreqid=srcTransfer.transferId, reqid=srcTransfer.reqId,
                   fileId=srcTransfer.fileId, DiskServers=str(schedSourceCandidates))
    transferList = []
    for diskServer, mountPoint in schedSourceCandidates:
      stransfer = D2DTransfer(**srcTransfer.__dict__)
      stransfer.diskServer = diskServer
      stransfer.mountPoint = mountPoint
      transferList.append(stransfer)
    # effectively schedule the transfer onto its source
    if not self._schedule(srcTransfer.transferId, srcTransfer.reqId, srcTransfer.fileId, transferList,
                          'd2dsrc', msgs.SCHEDD2DSRCFAILED, 'Unable to schedule on source host'):
      # source could not be scheduled. Give up. Note that the stagerDB has already been updated
      return
        
    # now schedule on all potential destinations
    schedDestCandidates = [candidate.split(':') for candidate in destFilesystems.split('|')]
    # 'Scheduling d2d destination' message
    dlf.writedebug(msgs.SCHEDD2DDEST, subreqid=srcTransfer.transferId, reqid=srcTransfer.reqId,
                   fileId=srcTransfer.fileId, DiskServers=str(schedDestCandidates))
    # build the list of hosts and transfers to launch
    transferList = []
    for diskServer, mountPoint in schedDestCandidates:
      dtransfer = D2DTransfer(**srcTransfer.__dict__)
      dtransfer.transferType = TransferType.D2DDST
      dtransfer.diskServer = diskServer
      dtransfer.mountPoint = mountPoint
      transferList.append(dtransfer)
    # effectively schedule the transfer onto its destination
    self._schedule(srcTransfer.transferId, srcTransfer.reqId, srcTransfer.fileId, transferList,
                   'd2ddest', msgs.SCHEDD2DDESTFAILED, 'Failed to schedule d2d destination')

  def run(self):
    '''main method, containing the infinite loop'''
    while self.running:
      try:
        # setup an oracle connection and register our interest for 'transferReadyToSchedule' alerts
        stcur = self.dbConnection().cursor()
        try:
          stcur.execute("BEGIN DBMS_ALERT.REGISTER('d2dReadyToSchedule'); END;")
          # prepare a cursor for database polling
          stcur = self.dbConnection().cursor()
          stcur.arraysize = 50
          transferId = stcur.var(cx_Oracle.STRING)
          reqId = stcur.var(cx_Oracle.STRING)
          fileId = stcur.var(cx_Oracle.NUMBER)
          nsHost = stcur.var(cx_Oracle.STRING)
          euid = stcur.var(cx_Oracle.NUMBER)
          egid = stcur.var(cx_Oracle.NUMBER)
          svcClassName = stcur.var(cx_Oracle.STRING)
          creationTime = stcur.var(cx_Oracle.NUMBER)
          destFileSystems = stcur.var(cx_Oracle.STRING)
          srcFileSystems = stcur.var(cx_Oracle.STRING)
          stTransferToSchedule = 'BEGIN D2dTransferToSchedule(:transferId, :reqId, :fileId, :nsHost, :euid, :egid, :svcClassName, :creationTime, :destFileSystems, :srcFileSystems); END;' # pylint: disable=C0301
          # infinite loop over the polling of the DB
          while self.running:
            # see whether there is something to do
            # not that this will hang until something comes or the internal timeout is reached
            stcur.execute(stTransferToSchedule, (transferId, reqId, fileId, nsHost, euid, egid,
                                                 svcClassName, creationTime,
                                                 destFileSystems, srcFileSystems))
            # in case of timeout, we may have nothing to do
            if transferId.getvalue() != None:
              self.workToDispatch.put((self._scheduleD2d,
                                       (D2DTransfer(transferId.getvalue(), reqId.getvalue(),
                                                    (nsHost.getvalue(), int(fileId.getvalue())),
                                                    int(euid.getvalue()), int(egid.getvalue()),
                                                    svcClassName.getvalue(), creationTime.getvalue(),
                                                    TransferType.D2DSRC),
                                        srcFileSystems.getvalue(), destFileSystems.getvalue())))
        finally:
          stcur.close()
      except Exception, e:
        # "Caught exception in Dispatcher thread" message
        dlf.writeerr(msgs.DISPATCHJOBEXCEPTION, Type=str(e.__class__), Message=str(e))
        # check whether we should reconnect to DB, and do so if needed
        self.dbConnection().checkForReconnection(e)
        # then sleep a bit to not loop to fast on the error
        time.sleep(1)