Commit 592f9d1c authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Merge branch 'v2_1_14Version'

parents df2cfaf6 ed5c763e
# initialization of the messages
dlf.addmessages({msgs.INVOKINGSCHEDULETRANSFER : 'Invoking scheduleTransfer',
......@@ -87,4 +89,10 @@ dlf.addmessages({msgs.INVOKINGSCHEDULETRANSFER : 'Invoking scheduleTransfer',
msgs.SENDREPORTEXCEPTION : 'Caught exception when sending report to transfermanager, will try other transfermanagers',
msgs.SENDREPORTFAILURE : 'Could not send report to any transfermanager, giving up',
msgs.INVOKINGMACHINEDISABLED : 'Invoking machineDisabled',
msgs.INVOKINGFSDISABLED : 'Invoking FSDisabled'})
msgs.INVOKINGFSDISABLED : 'Invoking FSDisabled',
msgs.SCHEDFROMBACKFILL : 'Not under pressure, scheduling work from the backfill queues',
msgs.PUTJOBINBACKFILL : 'Under pressure, putting job to d2dbackfill queue',
msgs.SCHEDUSERJOBUNDERPRESSURE : 'Under pressure, but still scheduling user job',
msgs.SCHEDPRIORITY : 'Scheduled job from priority queue',
msgs.AVOIDBACKFILLSTARV : 'Attempted to schedule one job from backfill according to MaxRegularJobsBeforeBackfill',
msgs.SCHEDUSERJOB : 'Scheduled job from regular queue'})
......@@ -99,14 +99,23 @@ class LocalQueue(Queue.Queue):
def putPriority(self, scheduler, transfer):
def _putInPriorityQueue(self, transfer):
'''Put a new transfer in the priority queue'''
if transfer.transferType == TransferType.STD or \
((transfer.transferType == TransferType.D2DSRC or transfer.transferType == TransferType.D2DDST) \
and transfer.replicationType == D2DTransferType.USER):
def putPriority(self, scheduler, transfer):
'''Put a new transfer in the priority queue and register it in queueingTransfers'''
# first keep note of the new transfer (indexed by subreqId)
self.queueingTransfers[transfer.transferId] = QueueingTransfer(scheduler, transfer)
# then add it to the underlying priority queue
# then add it to the underlying priority queue, depending on its type
......@@ -114,10 +123,15 @@ class LocalQueue(Queue.Queue):
'''internal method to get a transfer from the backfill queues,
managing them depending on how many slots are already taken'''
# are we already using too many slots? i.e. are there less than GuaranteedUserSlotsPercentage% free slots?
if self.runningTransfers.nbUsedSlots() <= self.config.getValue('DiskManager', 'NbSlots', 0, int) * \
(100 - self.config.getValue('DiskManager', 'GuaranteedUserSlotsPercentage', 50, int)) / 100:
nbSlots = self.config.getValue('DiskManager', 'NbSlots', 0, int)
guaranteedUserSlotsPercentage = self.config.getValue('DiskManager', 'GuaranteedUserSlotsPercentage', 50, int)
if self.runningTransfers.nbUsedSlots() <= nbSlots * (100 - guaranteedUserSlotsPercentage) / 100:
# no, so we accept any kind of backfill job
dlf.writedebug(msgs.SCHEDFROMBACKFILL, \
nbUsedSlots=self.runningTransfers.nbUsedSlots(), \
nbSlots=nbSlots, \
# first check the d2dsrc jobs in the backfill queue, without waiting
transferId = self.d2dBackfillQueue.get(False)
# found one, return it
......@@ -134,7 +148,8 @@ class LocalQueue(Queue.Queue):
# we will be back soon and we'll block on the normal queue
transferId = self.backfillQueue.get(False)
if self.queueingTransfers[transferId].transfer.transferType == TransferType.D2DSRC and \
if (self.queueingTransfers[transferId].transfer.transferType == TransferType.D2DSRC or \
self.queueingTransfers[transferId].transfer.transferType == TransferType.D2DDST) and \
self.queueingTransfers[transferId].transfer.replicationType != D2DTransferType.USER:
# we got one, but it's a non-user source disk-to-disk copy and we're busy.
# Hence we move it to the d2dBackfillQueue to leave some room for normal
......@@ -142,8 +157,16 @@ class LocalQueue(Queue.Queue):
# Note that we may starve d2dsrc jobs in case of heavy user activity
# coupled with heavy rebalancing! In this case the d2d jobs will wait
# until the total activity goes below 50% of the available slots.
dlf.writedebug(msgs.PUTJOBINBACKFILL, transferId=transferId, \
nbUsedSlots=self.runningTransfers.nbUsedSlots(), \
nbSlots=nbSlots, \
dlf.writedebug(msgs.SCHEDUSERJOBUNDERPRESSURE, transferId=transferId, \
nbUsedSlots=self.runningTransfers.nbUsedSlots(), \
nbSlots=nbSlots, \
# we got one, return it
return transferId
except KeyError:
......@@ -160,13 +183,17 @@ class LocalQueue(Queue.Queue):
# try to get a priority transfer first
transferId = self.priorityQueue.get(False)
dlf.writedebug(msgs.SCHEDPRIORITY, transferId=transferId)
except Queue.Empty:
# else get next transfer from either the regular or the backfill queue,
# which is guaranteed to be taken at least once
# every <maxRegularJobsBeforeBackfill> regular jobs
if self.countRegularJobs == self.config.getValue('DiskManager', 'MaxRegularJobsBeforeBackfill', 20, int):
maxRegularJobsBeforeBackfill = self.config.getValue('DiskManager', 'MaxRegularJobsBeforeBackfill', 20, int)
if self.countRegularJobs == maxRegularJobsBeforeBackfill:
# give a chance to the backfill queue
dlf.writedebug(msgs.AVOIDBACKFILLSTARV, \
raise Queue.Empty
# in case of no load (i.e. in the previous round we found nothing) check if
......@@ -176,6 +203,7 @@ class LocalQueue(Queue.Queue):
raise Queue.Empty
# block and timeout after 1s so that we can go back to the other queues or stop
transferId = Queue.Queue.get(self, timeout=1)
dlf.writedebug(msgs.SCHEDUSERJOB, transferId=transferId)
self.countRegularJobs += 1
except Queue.Empty:
......@@ -249,7 +277,7 @@ class LocalQueue(Queue.Queue):
# put the transferId back into the priority queue in it's time to retry the transfer
dlf.writedebug(msgs.RETRYTRANSFER, subreqid=transferId, reqid=reqid) # "Retrying transfer" message
del self.pendingD2dDest[transferId]
......@@ -266,7 +294,7 @@ class LocalQueue(Queue.Queue):
if timeOfNextTry < currentTime:
# put the transferId back into the priority queue in it's time to retry the transfer
dlf.writedebug(msgs.RETRYTRANSFER, subreqid=transferId) # "Retrying transfer" message
# cleanup list of pending transferIds
for tid in toBeDeleted:
......@@ -308,10 +308,18 @@ class ServerQueue(dict):
# we are precisely in the mentionned case. We can safely return as we already think
# that the job is running on that machine
# "Transfer starting reconfirmed" message
dlf.writedebug(msgs.TRANSFERSTARTCONFIRMED, DiskServer=transfer.diskServer,
subreqId=transfer.transferId, reqId=transfer.reqId)
dlf.write(msgs.TRANSFERSTARTCONFIRMED, DiskServer=transfer.diskServer,
subreqId=transfer.transferId, reqId=transfer.reqId)
if transfer.transferType == TransferType.D2DDST:
return self.d2dsrcrunning[transfer.transferId].srcTransfer
return self.d2dsrcrunning[transfer.transferId].srcTransfer
except KeyError:
# In this special case, the reconfirmation reconfirms that the job
# was canceled, not that it can run. This is made clear by the fact
# that the source has been cleaned up while it should be running
dlf.write(msgs.TRANSFERCANCELEDCONFIRMED, DiskServer=transfer.diskServer,
subreqId=transfer.transferId, reqId=transfer.reqId)
raise ValueError("Request canceled while queueing and retried due to timeout")
# The transfer has really started somewhere else. Let the diskServer know by raising an exception
......@@ -36,6 +36,7 @@ import random
import socket
import dlf
from transfermanagerdlf import msgs
import transfer
import connectionpool, diskserverlistcache
class SynchronizerThread(threading.Thread):
......@@ -44,9 +45,11 @@ class SynchronizerThread(threading.Thread):
transfers, meaning that it will check that transfers running for already long according to the DB
are effectively still running. If they are not, is will update the DB accordingly'''
def __init__(self):
def __init__(self, queueingTransfers):
'''constructor of this thread'''
super(SynchronizerThread, self).__init__(name='Synchronizer')
# the list of queueing transfers
self.queueingTransfers = queueingTransfers
# whether we should continue running
self.running = True
# whether we are connected to the stager DB
......@@ -160,7 +163,13 @@ class SynchronizerThread(threading.Thread):
stcur.callproc('getSchedulerD2dTransfers', [subReqIdsCur])
subReqIds = set([t[0] for t in subReqIdsCur.fetchall()])
# find out the set of these d2d source transfers that are no more in the DB
transfersToEnd = list(allTMD2dSrcSet - subReqIds)
transfersToEnd = allTMD2dSrcSet - subReqIds
# remove the ones for which a destination is still pending
# (they will be cleaned up by the destination job)
pendingD2dDst = set([transferId for transferId, transferType_unused \
in self.queueingTransfers.transfersLocations.keys() \
if transferType_unused == transfer.TransferType.D2DDST])
transfersToEnd = list(transfersToEnd - pendingD2dDst)
# and end them
if transfersToEnd:
tid2fileid = dict([(transferid, fileid) for transferid, reqid_unused, fileid in allTMD2dSrc])
......@@ -666,7 +666,7 @@ try:
# launch a processing thread that will regularly synchronize the stager DB with the running
# transfers, meaning that it will check that transfers running for already long according to the DB
# are effectively still running. If they are not, is will update the DB accordingly
synchroThread = synchronizer.SynchronizerThread()
synchroThread = synchronizer.SynchronizerThread(queueingTransfers)
# starts listening
except SystemExit:
......@@ -48,6 +48,7 @@ msgs = dlf.enum('ABORTEREXCEPTION', 'SYNCHROFAILED', 'SYNCHROEXCEPTION',
......@@ -113,6 +114,7 @@ dlf.addmessages({msgs.ABORTEREXCEPTION : 'Caught exception in Aborter thread',
msgs.SYNCDBWITHD2DSRC : 'Synchronizing stager DB with running d2d sources',
msgs.COULDNOTCONTACTTM : 'Could not contact transfer manager',
msgs.TRANSFERSTARTCONFIRMED : 'Transfer starting reconfirmed',
msgs.TRANSFERCANCELEDCONFIRMED : 'Transfer starting just reconfirmed was actually cancelation',
msgs.D2DENDEXCEPTION : 'Unable to end d2d as it\'s not in the server list. Probable race condition',
msgs.D2DDESTRESTARTERROR : 'Unable to put d2ddest back in queue as sources are missing. Probable race condition',
msgs.INVOKINGTRANSFERBACKTOQUEUE : 'Invoking transferBackToQueue',
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment