# initialization of the messages
dlf.addmessages({msgs.INVOKINGSCHEDULETRANSFER : 'Invoking scheduleTransfer',
......@@ -87,4 +89,10 @@ dlf.addmessages({msgs.INVOKINGSCHEDULETRANSFER : 'Invoking scheduleTransfer',
msgs.SENDREPORTEXCEPTION : 'Caught exception when sending report to transfermanager, will try other transfermanagers',
msgs.SENDREPORTFAILURE : 'Could not send report to any transfermanager, giving up',
msgs.INVOKINGMACHINEDISABLED : 'Invoking machineDisabled',
msgs.INVOKINGFSDISABLED : 'Invoking FSDisabled'})
msgs.INVOKINGFSDISABLED : 'Invoking FSDisabled',
msgs.SCHEDFROMBACKFILL : 'Not under pressure, scheduling work from the backfill queues',
msgs.PUTJOBINBACKFILL : 'Under pressure, putting job to d2dbackfill queue',
msgs.SCHEDUSERJOBUNDERPRESSURE : 'Under pressure, but still scheduling user job',
msgs.SCHEDPRIORITY : 'Scheduled job from priority queue',
msgs.AVOIDBACKFILLSTARV : 'Attempted to schedule one job from backfill according to MaxRegularJobsBeforeBackfill',
msgs.SCHEDUSERJOB : 'Scheduled job from regular queue'})
......@@ -114,10 +114,15 @@ class LocalQueue(Queue.Queue):
'''internal method to get a transfer from the backfill queues,
managing them depending on how many slots are already taken'''
# are we already using too many slots? i.e. are there less than GuaranteedUserSlotsPercentage% free slots?
if self.runningTransfers.nbUsedSlots() <= self.config.getValue('DiskManager', 'NbSlots', 0, int) * \
(100 - self.config.getValue('DiskManager', 'GuaranteedUserSlotsPercentage', 50, int)) / 100:
nbSlots = self.config.getValue('DiskManager', 'NbSlots', 0, int)
guaranteedUserSlotsPercentage = self.config.getValue('DiskManager', 'GuaranteedUserSlotsPercentage', 50, int)
if self.runningTransfers.nbUsedSlots() <= nbSlots * (100 - guaranteedUserSlotsPercentage) / 100:
# no, so we accept any kind of backfill job
dlf.writedebug(msgs.SCHEDFROMBACKFILL, \
nbUsedSlots=self.runningTransfers.nbUsedSlots(), \
nbSlots=nbSlots, \
# first check the d2dsrc jobs in the backfill queue, without waiting
transferId = self.d2dBackfillQueue.get(False)
# found one, return it
......@@ -142,8 +147,16 @@ class LocalQueue(Queue.Queue):
# Note that we may starve d2dsrc jobs in case of heavy user activity
# coupled with heavy rebalancing! In this case the d2d jobs will wait
# until the total activity goes below 50% of the available slots.
dlf.writedebug(msgs.PUTJOBINBACKFILL, transferId=transferId, \
nbUsedSlots=self.runningTransfers.nbUsedSlots(), \
nbSlots=nbSlots, \
dlf.writedebug(msgs.SCHEDUSERJOBUNDERPRESSURE, transferId=transferId, \
nbUsedSlots=self.runningTransfers.nbUsedSlots(), \
nbSlots=nbSlots, \
# we got one, return it
return transferId
except KeyError:
......@@ -160,13 +173,17 @@ class LocalQueue(Queue.Queue):
# try to get a priority transfer first
transferId = self.priorityQueue.get(False)
dlf.writedebug(msgs.SCHEDPRIORITY, transferId=transferId)
except Queue.Empty:
# else get next transfer from either the regular or the backfill queue,
# which is guaranteed to be taken at least once
# every <maxRegularJobsBeforeBackfill> regular jobs
if self.countRegularJobs == self.config.getValue('DiskManager', 'MaxRegularJobsBeforeBackfill', 20, int):
maxRegularJobsBeforeBackfill = self.config.getValue('DiskManager', 'MaxRegularJobsBeforeBackfill', 20, int)
if self.countRegularJobs == maxRegularJobsBeforeBackfill:
# give a chance to the backfill queue
dlf.writedebug(msgs.AVOIDBACKFILLSTARV, \
raise Queue.Empty
# in case of no load (i.e. in the previous round we found nothing) check if
......@@ -176,6 +193,7 @@ class LocalQueue(Queue.Queue):
raise Queue.Empty
# block and timeout after 1s so that we can go back to the other queues or stop
transferId = Queue.Queue.get(self, timeout=1)
dlf.writedebug(msgs.SCHEDUSERJOB, transferId=transferId)
self.countRegularJobs += 1
except Queue.Empty:
