Commit 4cb43bcc authored by Giuseppe Lo Presti's avatar Giuseppe Lo Presti
Browse files

Bug #103512: Incorrect handling of errors on disk-to-disk copy jobs

affecting draining operations.

Fixed all items mentioned in the ticket, plus added a timestamp column
on DrainingErrors that is shown by draindiskserver -q -f.
Furthermore, as DrainingErrors now contains a reference to DiskCopy,
draindiskserver -q -f prints the full path of the failed draining DC.
parent 467f6d3a
......@@ -877,7 +877,7 @@ END;
/
ALTER TABLE DiskCopy
ADD CONSTRAINT CK_DiskCopy_Status
CHECK (status IN (0, 4, 5, 6, 7, 9, 10, 11));
CHECK (status IN (0, 4, 5, 6, 7, 9, 11));
ALTER TABLE DiskCopy
ADD CONSTRAINT CK_DiskCopy_GcType
CHECK (gcType IN (0, 1, 2, 3, 4, 5, 6, 7));
......@@ -1174,6 +1174,7 @@ CREATE TABLE DrainingJob
egid INTEGER CONSTRAINT NN_DrainingJob_Egid NOT NULL,
pid INTEGER CONSTRAINT NN_DrainingJob_Pid NOT NULL,
machine VARCHAR2(2048) CONSTRAINT NN_DrainingJob_Machine NOT NULL,
reqId VARCHAR2(2048) CONSTRAINT NN_DrainingJob_ReqId NOT NULL,
creationTime INTEGER CONSTRAINT NN_DrainingJob_CT NOT NULL,
lastModificationTime INTEGER CONSTRAINT NN_DrainingJob_LMT NOT NULL,
status INTEGER CONSTRAINT NN_DrainingJob_Status NOT NULL,
......@@ -1228,7 +1229,9 @@ CREATE TABLE DrainingErrors
(drainingJob INTEGER CONSTRAINT NN_DrainingErrors_DJ NOT NULL,
errorMsg VARCHAR2(2048) CONSTRAINT NN_DrainingErrors_ErrorMsg NOT NULL,
fileId INTEGER CONSTRAINT NN_DrainingErrors_FileId NOT NULL,
nsHost VARCHAR2(2048) CONSTRAINT NN_DrainingErrors_NsHost NOT NULL)
nsHost VARCHAR2(2048) CONSTRAINT NN_DrainingErrors_NsHost NOT NULL,
diskCopy INTEGER CONSTRAINT NN_DrainingErrors_DiskCopy NOT NULL,
timeStamp NUMBER CONSTRAINT NN_DrainingErrors_TimeStamp NOT NULL)
ENABLE ROW MOVEMENT;
CREATE INDEX I_DrainingErrors_DJ ON DrainingErrors (drainingJob);
......@@ -1238,6 +1241,12 @@ ALTER TABLE DrainingErrors
FOREIGN KEY (drainingJob)
REFERENCES DrainingJob (id);
ALTER TABLE DrainingErrors
ADD CONSTRAINT FK_DrainingErrors_DC
FOREIGN KEY (diskCopy)
REFERENCES DiskCopy (id);
/* Definition of the Disk2DiskCopyJob table. Each line is a disk2diskCopy job to process
* id : unique DB identifier for this job
* transferId : unique identifier for the transfer associated to this job
......
......@@ -20,8 +20,6 @@ END;
/* handle the creation of the Disk2DiskCopyJobs for the running drainingJobs */
CREATE OR REPLACE PROCEDURE drainRunner AS
varNbFiles INTEGER;
varNbBytes INTEGER;
varNbRunningJobs INTEGER;
varMaxNbOfSchedD2dPerDrain INTEGER;
varUnused INTEGER;
......@@ -44,8 +42,6 @@ BEGIN
logToDLF(NULL, dlf.LVL_SYSTEM, dlf.DRAINING_REFILL, 0, '', 'stagerd',
'svcClass=' || getSvcClassName(dj.svcClass) || ' DrainReq=' ||
TO_CHAR(dj.id) || ' MaxNewJobsCount=' || TO_CHAR(varMaxNbOfSchedD2dPerDrain-varNbRunningJobs));
varNbFiles := 0;
varNbBytes := 0;
FOR F IN (SELECT * FROM
(SELECT CastorFile.id cfId, Castorfile.nsOpenTime, DiskCopy.id dcId, CastorFile.fileSize
FROM DiskCopy, CastorFile
......@@ -55,19 +51,15 @@ BEGIN
CastorFile.tapeStatus IN (dconst.CASTORFILE_NOTONTAPE, dconst.CASTORFILE_DISKONLY)) OR
(dj.fileMask = dconst.DRAIN_FILEMASK_ALL))
AND DiskCopy.status = dconst.DISKCOPY_VALID
AND NOT EXISTS (SELECT 1 FROM Disk2DiskCopyJob WHERE castorFile=CastorFile.id)
AND NOT EXISTS (SELECT 1 FROM Disk2DiskCopyJob WHERE castorFile = CastorFile.id)
AND NOT EXISTS (SELECT 1 FROM DrainingErrors WHERE diskCopy = DiskCopy.id)
ORDER BY DiskCopy.importance DESC)
WHERE ROWNUM <= varMaxNbOfSchedD2dPerDrain-varNbRunningJobs) LOOP
createDisk2DiskCopyJob(F.cfId, F.nsOpenTime, dj.svcClass, dj.euid, dj.egid,
dconst.REPLICATIONTYPE_DRAINING, F.dcId, dj.id, FALSE);
varNbFiles := varNbFiles + 1;
varNbBytes := varNbBytes + F.fileSize;
END LOOP;
-- commit and update counters
UPDATE DrainingJob
SET totalFiles = totalFiles + varNbFiles,
totalBytes = totalBytes + varNbBytes,
lastModificationTime = getTime()
SET lastModificationTime = getTime()
WHERE id = dj.id;
COMMIT;
EXCEPTION WHEN CONSTRAINT_VIOLATED THEN
......@@ -99,6 +91,7 @@ BEGIN
FOR dj IN (SELECT id, fileSystem FROM DrainingJob WHERE status = dconst.DRAININGJOB_SUBMITTED) LOOP
UPDATE DrainingJob SET status = dconst.DRAININGJOB_STARTING WHERE id = dj.id;
COMMIT;
-- Compute totals now. Jobs will be later added in bunches by drainRunner
SELECT count(*), SUM(diskCopySize) INTO varTFiles, varTBytes
FROM DiskCopy
WHERE fileSystem = dj.fileSystem
......@@ -242,7 +235,7 @@ BEGIN
END LOOP;
-- Create the drain manager job to be executed every minute. This one starts and clean up draining jobs
-- Create the drain manager job to be executed every minute. This one starts and cleans up draining jobs
DBMS_SCHEDULER.CREATE_JOB(
JOB_NAME => 'drainManagerJob',
JOB_TYPE => 'PLSQL_BLOCK',
......
......@@ -336,7 +336,6 @@ BEGIN
-- update DrainingJob
UPDATE DrainingJob
SET status = varStatus,
totalFiles = varTotalFiles,
nbFailedBytes = varNbFailedBytes,
nbSuccessBytes = varNbSuccessBytes,
nbFailedFiles = varNbFailedFiles,
......@@ -375,6 +374,8 @@ CREATE OR REPLACE PROCEDURE disk2DiskCopyEnded
BEGIN
varLogMsg := CASE WHEN inErrorMessage IS NULL THEN dlf.D2D_D2DDONE_OK ELSE dlf.D2D_D2DFAILED END;
BEGIN
-- Parse destination path
parsePath(inDestDsName ||':'|| inDestPath, varDestFsId, varDestPath, varDestDcId, varFileId, varNsHost);
-- Get data from the disk2DiskCopy Job
SELECT castorFile, ouid, ogid, destDcId, destSvcClass, replicationType,
replacedDcId, retryCounter, drainingJob
......@@ -382,21 +383,33 @@ BEGIN
varReplacedDcId, varRetryCounter, varDrainingJob
FROM Disk2DiskCopyJob
WHERE transferId = inTransferId;
-- lock the castor file (and get logging info)
SELECT fileid, nsHost, fileSize INTO varFileId, varNsHost, varFileSize
FROM CastorFile
WHERE id = varCfId
FOR UPDATE;
EXCEPTION WHEN NO_DATA_FOUND THEN
-- two possibilities here :
-- - disk2diskCopyJob not found. It was probably canceled.
-- - the castorFile has disappeared before we locked it, ant the
-- disk2diskCopyJob too as we have a foreign key constraint.
-- So our brand new copy has to be created as invalid to trigger GC.
-- The job was probably canceled: so our brand new copy
-- has to be created as invalid to trigger GC, and linked
-- to the (hopefully existing) correct CastorFile.
varNewDcStatus := dconst.DISKCOPY_INVALID;
varLogMsg := dlf.D2D_D2DDONE_CANCEL;
varDestDcId := ids_seq.nextval;
BEGIN
SELECT id INTO varCfId
FROM CastorFile
WHERE fileId = varFileId;
EXCEPTION WHEN NO_DATA_FOUND THEN
-- Here we also lost the CastorFile: this could happen
-- if the GC ran meanwhile. Fail and leave dark data behind,
-- the GC will eventually catch up. A full solution would be
-- to gather here all missing information to correctly
-- recreate the CastorFile entry, but this is too complex
-- for what we would gain.
logToDLF(NULL, dlf.LVL_NOTICE, dlf.D2D_D2DDONE_CANCEL, varFileId, varNsHost, 'transfermanagerd',
'transferId=' || inTransferId || ' errorMessage="CastorFile disappeared, giving up"');
RETURN;
END;
END;
-- lock the castor file (and get logging info)
SELECT fileid, nsHost, fileSize INTO varFileId, varNsHost, varFileSize
FROM CastorFile
WHERE id = varCfId
FOR UPDATE;
-- check the filesize
IF inReplicaFileSize != varFileSize THEN
-- replication went wrong !
......@@ -406,10 +419,10 @@ BEGIN
END IF;
END IF;
-- Log success or failure of the replication
varComment := 'transferId=' || inTransferId ||
' destSvcClass=' || getSvcClassName(varDestSvcClass) ||
' dstDcId=' || TO_CHAR(varDestDcId) || ' destPath=' || inDestDsName || ':' || inDestPath ||
' euid=' || TO_CHAR(varUid) || ' egid=' || TO_CHAR(varGid) ||
varComment := 'transferId="' || inTransferId ||
'" destSvcClass=' || getSvcClassName(varDestSvcClass) ||
' dstDcId=' || TO_CHAR(varDestDcId) || ' destPath="' || inDestDsName || ':' || inDestPath ||
'" euid=' || TO_CHAR(varUid) || ' egid=' || TO_CHAR(varGid) ||
' fileSize=' || TO_CHAR(varFileSize);
IF inErrorMessage IS NOT NULL THEN
varComment := varComment || ' replicaFileSize=' || TO_CHAR(inReplicaFileSize) ||
......@@ -418,13 +431,6 @@ BEGIN
logToDLF(NULL, dlf.LVL_SYSTEM, varLogMsg, varFileId, varNsHost, 'transfermanagerd', varComment);
-- if success, create new DiskCopy, restart waiting requests, cleanup and handle replicate on close
IF inErrorMessage IS NULL THEN
-- get filesystem of the diskcopy and parse diskcopy path
SELECT FileSystem.id, SUBSTR(inDestPath, LENGTH(FileSystem.mountPoint)+1)
INTO varDestFsId, varDestPath
FROM DiskServer, FileSystem
WHERE DiskServer.name = inDestDsName
AND FileSystem.diskServer = DiskServer.id
AND INSTR(inDestPath, FileSystem.mountPoint) = 1;
-- compute GcWeight and importance of the new copy
IF varNewDcStatus = dconst.DISKCOPY_VALID THEN
DECLARE
......@@ -471,6 +477,7 @@ BEGIN
updateDrainingJobOnD2dEnd(varDrainingJob, varFileSize, False);
END IF;
ELSE
-- failure
DECLARE
varMaxNbD2dRetries INTEGER := TO_NUMBER(getConfigOption('D2dCopy', 'MaxNbRetries', 2));
BEGIN
......@@ -483,7 +490,7 @@ BEGIN
SET status = dconst.DISK2DISKCOPYJOB_PENDING,
retryCounter = varRetryCounter + 1
WHERE transferId = inTransferId;
logToDLF(NULL, dlf.LVL_SYSTEM, dlf.D2D_D2DDONE_RETRIED, varFileId, varNsHost, 'stagerd', varComment ||
logToDLF(NULL, dlf.LVL_SYSTEM, dlf.D2D_D2DDONE_RETRIED, varFileId, varNsHost, 'transfermanagerd', varComment ||
' RetryNb=' || TO_CHAR(varRetryCounter+1) || ' maxNbRetries=' || TO_CHAR(varMaxNbD2dRetries));
ELSE
-- no retry, let's delete the disk to disk job copy
......@@ -491,15 +498,15 @@ BEGIN
DELETE FROM Disk2DiskCopyjob WHERE transferId = inTransferId;
-- and remember the error in case of draining
IF varDrainingJob IS NOT NULL THEN
INSERT INTO DrainingErrors (drainingJob, errorMsg, fileId, nsHost)
VALUES (varDrainingJob, inErrorMessage, varFileId, varNsHost);
INSERT INTO DrainingErrors (drainingJob, errorMsg, fileId, nsHost, diskCopy, timeStamp)
VALUES (varDrainingJob, inErrorMessage, varFileId, varNsHost, varDestDcId, getTime());
END IF;
EXCEPTION WHEN NO_DATA_FOUND THEN
-- the Disk2DiskCopyjob was already dropped (e.g. because of an interrupted draining)
-- in such a case, forget about the error
NULL;
END;
logToDLF(NULL, dlf.LVL_NOTICE, dlf.D2D_D2DDONE_NORETRY, varFileId, varNsHost, 'stagerd', varComment ||
logToDLF(NULL, dlf.LVL_NOTICE, dlf.D2D_D2DDONE_NORETRY, varFileId, varNsHost, 'transfermanagerd', varComment ||
' maxNbRetries=' || TO_CHAR(varMaxNbD2dRetries));
-- Fail waiting subrequests
UPDATE SubRequest
......@@ -549,7 +556,7 @@ BEGIN
AND status = dconst.DISK2DISKCOPYJOB_SCHEDULED;
EXCEPTION WHEN NO_DATA_FOUND THEN
-- log "disk2DiskCopyStart : Replication request canceled while queuing in scheduler or transfer already started"
logToDLF(NULL, dlf.LVL_USER_ERROR, dlf.D2D_CANCELED_AT_START, inFileId, inNsHost, 'stagerd',
logToDLF(NULL, dlf.LVL_USER_ERROR, dlf.D2D_CANCELED_AT_START, inFileId, inNsHost, 'transfermanagerd',
'TransferId=' || TO_CHAR(inTransferId) || ' destDiskServer=' || inDestDiskServerName ||
' destMountPoint=' || inDestMountPoint || ' srcDiskServer=' || inSrcDiskServerName ||
' srcMountPoint=' || inSrcMountPoint);
......@@ -571,7 +578,7 @@ BEGIN
AND DiskCopy.castorFile = varCfId;
EXCEPTION WHEN NO_DATA_FOUND THEN
-- log "disk2DiskCopyStart : Source has disappeared while queuing in scheduler, retrying"
logToDLF(NULL, dlf.LVL_SYSTEM, dlf.D2D_SOURCE_GONE, inFileId, inNsHost, 'stagerd',
logToDLF(NULL, dlf.LVL_SYSTEM, dlf.D2D_SOURCE_GONE, inFileId, inNsHost, 'transfermanagerd',
'TransferId=' || TO_CHAR(inTransferId) || ' destDiskServer=' || inDestDiskServerName ||
' destMountPoint=' || inDestMountPoint || ' srcDiskServer=' || inSrcDiskServerName ||
' srcMountPoint=' || inSrcMountPoint);
......@@ -584,7 +591,7 @@ BEGIN
IF (varSrcDsStatus = dconst.DISKSERVER_DISABLED OR varSrcFsStatus = dconst.FILESYSTEM_DISABLED
OR varSrcHwOnline = 0) THEN
-- log "disk2DiskCopyStart : Source diskserver/filesystem was DISABLED meanwhile"
logToDLF(NULL, dlf.LVL_WARNING, dlf.D2D_SRC_DISABLED, inFileId, inNsHost, 'stagerd',
logToDLF(NULL, dlf.LVL_WARNING, dlf.D2D_SRC_DISABLED, inFileId, inNsHost, 'transfermanagerd',
'TransferId=' || TO_CHAR(inTransferId) || ' diskServer=' || inSrcDiskServerName ||
' fileSystem=' || inSrcMountPoint);
-- fail d2d transfer
......@@ -604,7 +611,7 @@ BEGIN
IF (varDestDsStatus != dconst.DISKSERVER_PRODUCTION OR varDestFsStatus != dconst.FILESYSTEM_PRODUCTION
OR varDestHwOnline = 0) THEN
-- log "disk2DiskCopyStart : Destination diskserver/filesystem not in PRODUCTION any longer"
logToDLF(NULL, dlf.LVL_WARNING, dlf.D2D_DEST_NOT_PRODUCTION, inFileId, inNsHost, 'stagerd',
logToDLF(NULL, dlf.LVL_WARNING, dlf.D2D_DEST_NOT_PRODUCTION, inFileId, inNsHost, 'transfermanagerd',
'TransferId=' || TO_CHAR(inTransferId) || ' diskServer=' || inDestDiskServerName);
-- fail d2d transfer
disk2DiskCopyEnded(inTransferId, '', '', 0, 0, 'Destination not in production');
......@@ -622,7 +629,7 @@ BEGIN
AND DiskCopy.status = dconst.DISKCOPY_VALID;
IF varNbCopies > 0 THEN
-- log "disk2DiskCopyStart : Multiple copies of this file already found on this diskserver"
logToDLF(NULL, dlf.LVL_ERROR, dlf.D2D_MULTIPLE_COPIES_ON_DS, inFileId, inNsHost, 'stagerd',
logToDLF(NULL, dlf.LVL_ERROR, dlf.D2D_MULTIPLE_COPIES_ON_DS, inFileId, inNsHost, 'transfermanagerd',
'TransferId=' || TO_CHAR(inTransferId) || ' diskServer=' || inDestDiskServerName);
-- fail d2d transfer
disk2DiskCopyEnded(inTransferId, '', '', 0, 0, 'Copy found on diskserver');
......@@ -645,7 +652,7 @@ BEGIN
outSrcDcPath := inSrcDiskServerName || ':' || inSrcMountPoint || outSrcDcPath;
-- log "disk2DiskCopyStart called and returned successfully"
logToDLF(NULL, dlf.LVL_SYSTEM, dlf.D2D_START_OK, inFileId, inNsHost, 'stagerd',
logToDLF(NULL, dlf.LVL_SYSTEM, dlf.D2D_START_OK, inFileId, inNsHost, 'transfermanagerd',
'TransferId=' || TO_CHAR(inTransferId) || ' srcPath=' || outSrcDcPath ||
' destPath=' || outDestDcPath);
END;
......
......@@ -357,19 +357,20 @@ def queryFailures():
fileSystems = getFileSystems(stcur)
# build SQL statement
stGetFailures = '''
SELECT DiskServer.name, fileSystem.mountPoint,
DrainingErrors.fileId, DrainingErrors.errorMsg
FROM DiskServer, FileSystem, DrainingJob, DrainingErrors
SELECT DiskServer.name, fileSystem.mountPoint || DiskCopy.path as filePath,
DrainingErrors.timestamp, DrainingErrors.errorMsg
FROM DiskServer, FileSystem, DrainingJob, DrainingErrors, DiskCopy
WHERE FileSystem.diskServer = DiskServer.id
AND DrainingJob.fileSystem = FileSystem.id
AND DrainingErrors.drainingJob = DrainingJob.id'''
AND DrainingErrors.drainingJob = DrainingJob.id
AND DrainingErrors.diskCopy = DiskCopy.id'''
if fileSystems:
stGetFailures += ' AND FileSystem.id IN (' + ', '.join([str(fs[2]) for fs in fileSystems]) + ')'
# get data
stcur.execute(stGetFailures)
data = stcur.fetchall()
# pretty printing
titles = ('DiskServer', 'MountPoint', 'fileId', 'Error')
titles = ('DiskServer', 'FilePath', 'Timestamp', 'Error')
castor_tools.prettyPrintTable(titles, data)
finally:
# close DB connection
......@@ -438,7 +439,8 @@ def queryDrain():
zr = zip(*results)
totFiles, totBytes, totFailedBytes, totSuccessBytes, totFailedFiles, totSuccessFiles = \
sum(zr[6]), sum(zr[7]), sum(zr[11]), sum(zr[12]), sum(zr[13]), sum(zr[14])
data.append(('', '', '', totFiles, castor_tools.nbToDataAmount(totBytes),
data.append(('', 'TOTAL', castor_tools.secsToDate(currentTime),
totFiles, castor_tools.nbToDataAmount(totBytes),
totFiles-totFailedFiles-totSuccessFiles,
castor_tools.nbToDataAmount(totBytes-totFailedBytes-totSuccessBytes),
totSuccessFiles, totFailedFiles, '',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment