Commit 066f5dda authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Implemented regular and backfill queues in the diskmanager

parent e14502c7
......@@ -1130,6 +1130,7 @@ PROCEDURE D2dTransferToSchedule(outTransferId OUT VARCHAR2, outReqId OUT VARCHAR
outFileId OUT INTEGER, outNsHost OUT VARCHAR2,
outEuid OUT INTEGER, outEgid OUT INTEGER,
outSvcClassName OUT VARCHAR2, outCreationTime OUT INTEGER,
outreplicationType OUT INTEGER,
outDestFileSystems OUT VARCHAR2, outSourceFileSystems OUT VARCHAR2) AS
cfId NUMBER;
-- Cursor to select the next candidate for submission to the scheduler orderd
......@@ -1186,8 +1187,10 @@ BEGIN
UPDATE /*+ INDEX(Disk2DiskCopyJob PK_Disk2DiskCopyJob_Id)*/ Disk2DiskCopyJob
SET status = dconst.DISK2DISKCOPYJOB_SCHEDULED
WHERE id = varD2dJId
RETURNING transferId, castorFile, ouid, ogid, creationTime, destSvcClass, getSvcClassName(destSvcClass)
INTO outTransferId, varCfId, outEuid, outEgid, outCreationTime, varSvcClassId, outSvcClassName;
RETURNING transferId, castorFile, ouid, ogid, creationTime, destSvcClass,
getSvcClassName(destSvcClass), replicationType
INTO outTransferId, varCfId, outEuid, outEgid, outCreationTime, varSvcClassId,
outSvcClassName, outReplicationType;
-- Extract the rest of the information required by transfer manager
SELECT fileId, nsHost, fileSize INTO outFileId, outNsHost, varFileSize
FROM CastorFile
......
......@@ -34,7 +34,7 @@ import dlf
import castor_tools
import connectionpool
from diskmanagerdlf import msgs
from transfer import TransferType
from transfer import TransferType, D2DTransferType
class QueueingTransfer(object):
'''little container describing a queueing transfer'''
......@@ -45,16 +45,18 @@ class QueueingTransfer(object):
class LocalQueue(Queue.Queue):
'''Class managing a queue of pending transfers.
There are actually 2 queues, a dictionnary and a set involved :
There are actually 3 queues, a dictionary and a set involved :
- the main queue is the object itself and holds all transfers that were not considered so far.
- _pendingD2dDest is a dictionary of disk2disk destinations transfers that have been considered but could not
be started because the source was not ready. They will be retried regularly until the source
becomes ready. They are stored together with the next time when to retry. The interval between
retries is equal to half the time gone since the arrival of the transfer, with a maximum
configured in castor.conf (DiskManager/MaxRetryInterval).
- _priorityQueue is a queue of transfers to be started as soon as possible. These transfers are the disk2disk
- priorityQueue is a queue of transfers to be started as soon as possible. These transfers are the disk2disk
destination transfers that have been pending on the resdiness of the source and can now be started
- finally, all transfers handled are indexed in a dictionnary called queueingTransfers handling detailed
- backfillQueue is a queue for non-user-triggered transfers to be executed with lower priority compared
to the regular transfers.
- finally, all transfers handled are indexed in a dictionary called queueingTransfers handling detailed
information
'''
......@@ -71,23 +73,32 @@ class LocalQueue(Queue.Queue):
self.pendingD2dDest = {}
# set of transfers to start with highest priority
self.priorityQueue = Queue.Queue()
# set of transfers to start with lower priority
self.backfillQueue = Queue.Queue()
# counter of scheduled regular (i.e. user driven) jobs
self.countRegularJobs = 0
def put(self, scheduler, transfer):
'''Put new transfer in the queue'''
'''Put a new transfer in the regular or backfill queue according to the transfer replication type'''
self.lock.acquire()
try:
# first keep note of the new transfer (index by subreqId)
# first keep note of the new transfer (indexed by subreqId)
self.queueingTransfers[transfer.transferId] = QueueingTransfer(scheduler, transfer)
# then add it to the underlying queue
Queue.Queue.put(self, transfer.transferId)
# then add it to the underlying queue, depending on its type
if transfer.transferType == TransferType.STD or \
((transfer.transferType == TransferType.D2DSRC or transfer.transferType == TransferType.D2DDST) \
and transfer.replicationType == D2DTransferType.USER):
Queue.Queue.put(self, transfer.transferId)
else:
self.backfillQueue.put(transfer.transferId)
finally:
self.lock.release()
def putPriority(self, scheduler, transfer):
'''Put new transfer in the queue'''
'''Put a new transfer in the priority queue'''
self.lock.acquire()
try:
# first keep note of the new transfer (index by subreqId)
# first keep note of the new transfer (indexed by subreqId)
self.queueingTransfers[transfer.transferId] = QueueingTransfer(scheduler, transfer)
# then add it to the underlying priority queue
self.priorityQueue.put(transfer.transferId)
......@@ -98,16 +109,29 @@ class LocalQueue(Queue.Queue):
'''get a transfer from the queue. Times out after 1s'''
found = False
while not found:
# try to get a priority transfer first
try :
try:
# try to get a priority transfer first
transferId = self.priorityQueue.get(False)
except Queue.Empty:
# else get next transfer from the regular queue
# timeout after 1s so that we can go back to the priority queue
# else get next transfer from either the regular or the backfill queue,
# which is guaranteed to be taken at least once
# every <maxRegularJobsBeforeBackfill> regular jobs
try:
transferId = Queue.Queue.get(self, timeout=1)
if self.countRegularJobs == self.config.getValue('DiskManager', 'MaxRegularJobsBeforeBackfill', 20, int):
# give a chance to the backfill queue
raise Queue.Empty
else:
# block and timeout after 1s so that we can go back to the other queues
transferId = Queue.Queue.get(self, timeout=1)
self.countRegularJobs += 1
except Queue.Empty:
return None
try:
self.countRegularJobs = 0
# don't wait on the backfill queue: in case nothing is found,
# we will be back soon and we'll block on the normal queue
transferId = self.backfillQueue.get(False)
except Queue.Empty:
return None
self.lock.acquire()
try:
try:
......
......@@ -66,6 +66,27 @@ class TapeTransferType(object):
return 'UNKNOWN'
class D2DTransferType(object):
'''Constants defining the existing types of d2d transfers'''
USER = 0
INTERNAL = 1
DRAINING = 2
def __init__(self):
'''empty constructor, raises an exception'''
raise NotImplementedError
@staticmethod
def toStr(tType):
'''prints a readable version of the d2d transfer types'''
if tType == D2DTransferType.USER:
return 'user'
elif tType == D2DTransferType.INTERNAL:
return 'internal'
elif tType == D2DTransferType.DRAINING:
return 'draining'
else:
return 'UNKNOWN'
def getProcessStartTime(pid):
'''Little utility able to get the start time of a process
Note that this is linux specific code'''
......@@ -246,10 +267,11 @@ class Transfer(BaseTransfer):
class D2DTransfer(BaseTransfer):
'''little container describing a disk to disk transfer'''
def __init__(self, transferId, reqId, fileId, euid, egid, svcClassName, creationTime, transferType,
diskServer='', mountPoint='', isSrcRunning=False):
replicationType, diskServer='', mountPoint='', isSrcRunning=False):
'''constructor'''
super(D2DTransfer, self).__init__(transferId, reqId, fileId, euid, egid,
svcClassName, creationTime, transferType, diskServer, mountPoint)
self.replicationType = replicationType
self.isSrcRunning = isSrcRunning
@property
......
......@@ -468,15 +468,16 @@ class D2DDispatcherThread(AbstractDispatcherThread):
egid = stcur.var(cx_Oracle.NUMBER)
svcClassName = stcur.var(cx_Oracle.STRING)
creationTime = stcur.var(cx_Oracle.NUMBER)
replicationType = stcur.var(cx_Oracle.NUMBER)
destFileSystems = stcur.var(cx_Oracle.STRING)
srcFileSystems = stcur.var(cx_Oracle.STRING)
stTransferToSchedule = 'BEGIN D2dTransferToSchedule(:transferId, :reqId, :fileId, :nsHost, :euid, :egid, :svcClassName, :creationTime, :destFileSystems, :srcFileSystems); END;' # pylint: disable=C0301
stTransferToSchedule = 'BEGIN D2dTransferToSchedule(:transferId, :reqId, :fileId, :nsHost, :euid, :egid, :svcClassName, :creationTime, :replicationType, :destFileSystems, :srcFileSystems); END;' # pylint: disable=C0301
# infinite loop over the polling of the DB
while self.running:
# see whether there is something to do
# not that this will hang until something comes or the internal timeout is reached
stcur.execute(stTransferToSchedule, (transferId, reqId, fileId, nsHost, euid, egid,
svcClassName, creationTime,
svcClassName, creationTime, replicationType,
destFileSystems, srcFileSystems))
# in case of timeout, we may have nothing to do
if transferId.getvalue() != None:
......@@ -485,7 +486,7 @@ class D2DDispatcherThread(AbstractDispatcherThread):
(nsHost.getvalue(), int(fileId.getvalue())),
int(euid.getvalue()), int(egid.getvalue()),
svcClassName.getvalue(), creationTime.getvalue(),
TransferType.D2DSRC),
TransferType.D2DSRC, replicationType.getvalue()),
srcFileSystems.getvalue(), destFileSystems.getvalue())))
finally:
stcur.close()
......
......@@ -393,29 +393,30 @@ class TransferManagerService(rpyc.Service):
if transfer.transferType == TransferType.D2DDST:
try:
self.__class__.acquireDbConnection()
stcur = self.__class__.dbConnection().cursor()
srcDcPath = stcur.var(cx_Oracle.STRING)
destDcPath = stcur.var(cx_Oracle.STRING)
stcur.execute("BEGIN disk2DiskCopyStart(:1, :2, :3, :4, :5, :6, :7, :8, :9); END;",
(transfer.transferId, transfer.fileId[1], transfer.fileId[0],
transfer.diskServer, transfer.mountPoint,
srcTransfer.diskServer, srcTransfer.mountPoint,
destDcPath, srcDcPath))
stcur.close()
return srcDcPath.getvalue(), destDcPath.getvalue()
except Exception, e:
# check whether we should reconnect to DB, and do so if needed
self.__class__.dbConnection().checkForReconnection(e)
# detect special error where the client can give up
if isinstance(e, cx_Oracle.Error):
error, = e.args
if isinstance(error, cx_Oracle._Error):
errorcode = error.code
if (errorcode == 20110):
# raise ValueError to tell the upper layer to give up
raise ValueError(e.message)
# otherwise reraise for the upper layer
raise e
try:
stcur = self.__class__.dbConnection().cursor()
srcDcPath = stcur.var(cx_Oracle.STRING)
destDcPath = stcur.var(cx_Oracle.STRING)
stcur.execute("BEGIN disk2DiskCopyStart(:1, :2, :3, :4, :5, :6, :7, :8, :9); END;",
(transfer.transferId, transfer.fileId[1], transfer.fileId[0],
transfer.diskServer, transfer.mountPoint,
srcTransfer.diskServer, srcTransfer.mountPoint,
destDcPath, srcDcPath))
stcur.close()
return srcDcPath.getvalue(), destDcPath.getvalue()
except Exception, e:
# check whether we should reconnect to DB, and do so if needed
self.__class__.dbConnection().checkForReconnection(e)
# detect special error where the client can give up
if isinstance(e, cx_Oracle.Error):
error, = e.args
if isinstance(error, cx_Oracle._Error):
errorcode = error.code
if (errorcode == 20110):
# raise ValueError to tell the upper layer to give up
raise ValueError(e.message)
# otherwise reraise for the upper layer
raise e
finally:
self.__class__.releaseDbConnection()
else:
......
......@@ -281,22 +281,26 @@
# Interval between two heartbeats send to the transfer manager, expressed in seconds.
# Default is 1.0
#DiskManager HeartbeatInterval 1.0
# Interval between two logs of heartbeat not sent errors, expressed in seconds.
# Other are logged only in debug level in order to not flood the logs when
# the transfermanagers are all down.
# Default is 300.0
#DiskManager HeartbeatNotSentLogInterval 300.0
# Number of user-requested jobs to be scheduled before a backfill job (e.g. internally
# triggered jobs for draining or rebalancing) is taken into account. 20 means that in case
# of heavy load only 5% of the slots are taken for backfill jobs. In case of no load,
# backfill jobs may take over up to all available slots, but user-requested jobs
# will still have higher priority and will overtake backfill ones.
#DiskManager MaxRegularJobsBeforeBackfill 20
# List of mountpoints the DiskManager daemon should monitor
#DiskManager MountPoints /srv/castor/01/ /srv/castor/02/ /srv/castor/03/
# The following definition allow the max and minallowed free space to be defined
# at the disk server level respectively.
#DiskManager FSMaxFreeSpace .10
#DiskManager FSMinAllowedFreeSpace .05
......@@ -794,7 +798,6 @@
# message priorities above and including INFO.
#LogMask tapebridged LOG_INFO
#LogMask d2dtransfer LOG_INFO
#LogMask gcd LOG_INFO
#LogMask rhd LOG_INFO
#LogMask rmcd LOG_INFO
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment