Commit 274110b7 authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Make sure the backfill queue is not throttled when there is no load.

Implemented on a plane, tested on ground :-)
parent 6d740f77
......@@ -55,9 +55,9 @@ class LocalQueue(Queue.Queue):
- priorityQueue is a queue of transfers to be started as soon as possible. These transfers are the disk2disk
destination transfers that have been pending on the resdiness of the source and can now be started
- backfillQueue is a queue for non-user-triggered transfers to be executed with lower priority compared
to the regular transfers.
to the regular transfers, but with the ability to be used at full speed when the main queue is empty.
- finally, all transfers handled are indexed in a dictionary called queueingTransfers handling detailed
information
information.
'''
def __init__(self):
......@@ -77,6 +77,8 @@ class LocalQueue(Queue.Queue):
self.backfillQueue = Queue.Queue()
# counter of scheduled regular (i.e. user driven) jobs
self.countRegularJobs = 0
# flag to determine whether to drain the backfill queue when no user driven jobs are present
self.drainBackfill = False
def put(self, scheduler, transfer):
'''Put a new transfer in the regular or backfill queue according to the transfer replication type'''
......@@ -121,16 +123,25 @@ class LocalQueue(Queue.Queue):
# give a chance to the backfill queue
raise Queue.Empty
else:
# block and timeout after 1s so that we can go back to the other queues
# in case of no load (i.e. in the previous round we found nothing) check if
# the normal queue has got something: if yes, take it first, otherwise drain
# the backfill queue instead of waiting 1 sec so to not throttle any job
if self.drainBackfill and self.empty():
raise Queue.Empty
# block and timeout after 1s so that we can go back to the other queues or stop
transferId = Queue.Queue.get(self, timeout=1)
self.countRegularJobs += 1
except Queue.Empty:
try:
self.drainBackfill = True
self.countRegularJobs = 0
# don't wait on the backfill queue: in case nothing is found,
# we will be back soon and we'll block on the normal queue
transferId = self.backfillQueue.get(False)
except Queue.Empty:
# the backfill queue is empty, so nothing to drain (any longer):
# give back priority to the user queue (potentially waiting on it) when coming back
self.drainBackfill = False
return None
self.lock.acquire()
try:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment