oracleStager.sql 176 KB
Newer Older
1
/*******************************************************************
2
 *
3
 * PL/SQL code for the stager and resource monitoring
4
5
6
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *******************************************************************/
7

8
/* PL/SQL declaration for the castor package */
9
CREATE OR REPLACE PACKAGE castor AS
10
  TYPE DiskCopyCore IS RECORD (
11
12
13
14
15
16
    id INTEGER,
    path VARCHAR2(2048),
    status NUMBER,
    fsWeight NUMBER,
    mountPoint VARCHAR2(2048),
    diskServer VARCHAR2(2048));
17
  TYPE DiskCopy_Cur IS REF CURSOR RETURN DiskCopyCore;
18
19
  TYPE "strList" IS TABLE OF VARCHAR2(2048) index BY binary_integer;
  TYPE "cnumList" IS TABLE OF NUMBER index BY binary_integer;
20
  TYPE QueryLine IS RECORD (
21
22
23
24
25
26
27
28
29
    fileid INTEGER,
    nshost VARCHAR2(2048),
    diskCopyId INTEGER,
    diskCopyPath VARCHAR2(2048),
    filesize INTEGER,
    diskCopyStatus INTEGER,
    diskServerName VARCHAR2(2048),
    fileSystemMountPoint VARCHAR2(2048),
    nbaccesses INTEGER,
30
31
32
    lastKnownFileName VARCHAR2(2048),
    creationTime INTEGER,
    svcClass VARCHAR2(2048),
33
    lastAccessTime INTEGER,
34
    isOnDrainingHardware INTEGER);
35
  TYPE QueryLine_Cur IS REF CURSOR RETURN QueryLine;
36
37
38
39
  TYPE FileList IS RECORD (
    fileId NUMBER,
    nsHost VARCHAR2(2048));
  TYPE FileList_Cur IS REF CURSOR RETURN FileList;
40
  TYPE DiskPoolQueryLine IS RECORD (
41
42
43
44
45
46
47
48
49
50
    isDP INTEGER,
    isDS INTEGER,
    diskServerName VARCHAR(2048),
    diskServerStatus INTEGER,
    fileSystemmountPoint VARCHAR(2048),
    fileSystemfreeSpace INTEGER,
    fileSystemtotalSpace INTEGER,
    fileSystemminfreeSpace INTEGER,
    fileSystemmaxFreeSpace INTEGER,
    fileSystemStatus INTEGER);
51
52
  TYPE DiskPoolQueryLine_Cur IS REF CURSOR RETURN DiskPoolQueryLine;
  TYPE DiskPoolsQueryLine IS RECORD (
53
54
55
56
57
58
59
60
61
62
63
    isDP INTEGER,
    isDS INTEGER,
    diskPoolName VARCHAR(2048),
    diskServerName VARCHAR(2048),
    diskServerStatus INTEGER,
    fileSystemmountPoint VARCHAR(2048),
    fileSystemfreeSpace INTEGER,
    fileSystemtotalSpace INTEGER,
    fileSystemminfreeSpace INTEGER,
    fileSystemmaxFreeSpace INTEGER,
    fileSystemStatus INTEGER);
64
65
66
  TYPE DiskPoolsQueryLine_Cur IS REF CURSOR RETURN DiskPoolsQueryLine;
  TYPE IDRecord IS RECORD (id INTEGER);
  TYPE IDRecord_Cur IS REF CURSOR RETURN IDRecord;
67
68
  TYPE UUIDRecord IS RECORD (uuid VARCHAR(2048));
  TYPE UUIDRecord_Cur IS REF CURSOR RETURN UUIDRecord;
69
70
  TYPE UUIDPairRecord IS RECORD (uuid1 VARCHAR(2048), uuid2 VARCHAR(2048));
  TYPE UUIDPairRecord_Cur IS REF CURSOR RETURN UUIDPairRecord;
71
72
  TYPE TransferRecord IS RECORD (subreId VARCHAR(2048), resId VARCHAR(2048), fileId NUMBER, nsHost VARCHAR2(2048));
  TYPE TransferRecord_Cur IS REF CURSOR RETURN TransferRecord;
73
74
  TYPE StringValue IS RECORD (value VARCHAR(2048));
  TYPE StringList_Cur IS REF CURSOR RETURN StringValue;
75
  TYPE FileEntry IS RECORD (
76
77
    fileid INTEGER,
    nshost VARCHAR2(2048));
78
  TYPE FileEntry_Cur IS REF CURSOR RETURN FileEntry;
79
80
81
  TYPE TapeAccessPriority IS RECORD (
    euid INTEGER,
    egid INTEGER,
82
    priority INTEGER);
83
  TYPE TapeAccessPriority_Cur IS REF CURSOR RETURN TapeAccessPriority;
Giulia Taurelli's avatar
Giulia Taurelli committed
84
  TYPE StreamReport IS RECORD (
85
86
    diskserver VARCHAR2(2048),
    mountPoint VARCHAR2(2048));
87
  TYPE StreamReport_Cur IS REF CURSOR RETURN StreamReport;
88
  TYPE FileResult IS RECORD (
89
90
91
92
    fileid INTEGER,
    nshost VARCHAR2(2048),
    errorcode INTEGER,
    errormessage VARCHAR2(2048));
93
  TYPE FileResult_Cur IS REF CURSOR RETURN FileResult;
94
95
  TYPE DiskCopyResult IS RECORD (
    dcId INTEGER,
96
    fileId INTEGER,
97
    msg VARCHAR2(2048),
98
99
    retCode INTEGER);
  TYPE DiskCopyResult_Cur IS REF CURSOR RETURN DiskCopyResult;
100
101
102
103
104
105
106
  TYPE LogEntry IS RECORD (
    timeinfo NUMBER,
    uuid VARCHAR2(2048),
    priority INTEGER,
    msg VARCHAR2(2048),
    fileId NUMBER,
    nsHost VARCHAR2(2048),
107
    source VARCHAR2(2048),
108
109
    params VARCHAR2(2048));
  TYPE LogEntry_Cur IS REF CURSOR RETURN LogEntry;
110
END castor;
Dennis Waldron's avatar
Dennis Waldron committed
111
/
112

113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/* Used to create a row in FileSystemsToCheck
   whenever a new FileSystem is created */
CREATE OR REPLACE TRIGGER tr_FileSystem_Insert
BEFORE INSERT ON FileSystem
FOR EACH ROW
BEGIN
  INSERT INTO FileSystemsToCheck (FileSystem, ToBeChecked) VALUES (:new.id, 0);
END;
/

/* Used to delete rows in FileSystemsToCheck
   whenever a FileSystem is deleted */
CREATE OR REPLACE TRIGGER tr_FileSystem_Delete
BEFORE DELETE ON FileSystem
FOR EACH ROW
BEGIN
  DELETE FROM FileSystemsToCheck WHERE FileSystem = :old.id;
END;
/
Giulia Taurelli's avatar
Giulia Taurelli committed
132

133
134
135
136
/* Checks consistency of DiskCopies when a FileSystem comes
 * back in production after a period spent in a DRAINING or a
 * DISABLED status.
 * Current checks/fixes include :
137
 *   - Canceling recalls for files that are VALID
138
139
140
141
142
143
144
145
146
147
148
149
150
151
 *     on the fileSystem that comes back. (Scheduled for bulk
 *     operation)
 *   - Dealing with files that are STAGEOUT on the fileSystem
 *     coming back but already exist on another one
 */
CREATE OR REPLACE PROCEDURE checkFSBackInProd(fsId NUMBER) AS
BEGIN
  -- Flag the filesystem for processing in a bulk operation later.
  -- We need to do this because some operations are database intensive
  -- and therefore it is often better to process several filesystems
  -- simultaneous with one query as opposed to one by one. Especially
  -- where full table scans are involved.
  UPDATE FileSystemsToCheck SET toBeChecked = 1
   WHERE fileSystem = fsId;
152
  -- Look for files that are STAGEOUT on the filesystem coming back to life
153
  -- but already VALID/WAITFS/STAGEOUT/
154
155
156
157
158
159
160
161
  -- WAITFS_SCHEDULING somewhere else
  FOR cf IN (SELECT /*+ USE_NL(D E) INDEX(D I_DiskCopy_Status6) */
                    UNIQUE D.castorfile, D.id dcId
               FROM DiskCopy D, DiskCopy E
              WHERE D.castorfile = E.castorfile
                AND D.fileSystem = fsId
                AND E.fileSystem != fsId
                AND decode(D.status,6,D.status,NULL) = dconst.DISKCOPY_STAGEOUT
162
                AND E.status IN (dconst.DISKCOPY_VALID,
163
164
                                 dconst.DISKCOPY_WAITFS, dconst.DISKCOPY_STAGEOUT,
                                 dconst.DISKCOPY_WAITFS_SCHEDULING)) LOOP
165
    -- Invalidate the DiskCopy
166
167
168
    UPDATE DiskCopy
       SET status = dconst.DISKCOPY_INVALID
     WHERE id = cf.dcId;
169
  END LOOP;
170
END;
Giulia Taurelli's avatar
Giulia Taurelli committed
171
172
/

173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/* PL/SQL method implementing bulkCheckFSBackInProd for processing
 * filesystems in one bulk operation to optimise database performance
 */
CREATE OR REPLACE PROCEDURE bulkCheckFSBackInProd AS
  fsIds "numList";
BEGIN
  -- Extract a list of filesystems which have been scheduled to be
  -- checked in a bulk operation on the database.
  UPDATE FileSystemsToCheck SET toBeChecked = 0
   WHERE toBeChecked = 1
  RETURNING fileSystem BULK COLLECT INTO fsIds;
  -- Nothing found, return
  IF fsIds.COUNT = 0 THEN
    RETURN;
  END IF;
188
  -- Look for recalls concerning files that are VALID
189
190
  -- on all filesystems scheduled to be checked, and restart their
  -- subrequests (reconsidering the recall source).
191
192
193
194
  FOR file IN (SELECT UNIQUE DiskCopy.castorFile
               FROM DiskCopy, RecallJob
              WHERE DiskCopy.castorfile = RecallJob.castorfile
                AND DiskCopy.fileSystem IN
195
                  (SELECT /*+ CARDINALITY(fsidTable 5) */ *
196
                     FROM TABLE(fsIds) fsidTable)
197
                AND DiskCopy.status = dconst.DISKCOPY_VALID) LOOP
198
199
200
201
202
203
204
205
206
    -- cancel the recall for that file
    deleteRecallJobs(file.castorFile);
    -- restart subrequests that were waiting on the recall
    UPDATE SubRequest
       SET status = dconst.SUBREQUEST_RESTART
     WHERE castorFile = file.castorFile
       AND status = dconst.SUBREQUEST_WAITTAPERECALL;
    -- commit that file
    COMMIT;
207
208
  END LOOP;
END;
Dennis Waldron's avatar
Dennis Waldron committed
209
/
210
211


212
/* SQL statement for the update trigger on the FileSystem table */
213
CREATE OR REPLACE TRIGGER tr_FileSystem_Update
Dennis Waldron's avatar
Dennis Waldron committed
214
BEFORE UPDATE OF status ON FileSystem
215
FOR EACH ROW WHEN (old.status != new.status)
216
BEGIN
217
218
  -- If the filesystem is coming back into PRODUCTION, initiate a consistency
  -- check for the diskcopies which reside on the filesystem.
219
220
  IF :old.status != dconst.FILESYSTEM_PRODUCTION AND
     :new.status = dconst.FILESYSTEM_PRODUCTION THEN
221
222
    checkFsBackInProd(:old.id);
  END IF;
223
END;
Dennis Waldron's avatar
Dennis Waldron committed
224
/
225

226

227
/* SQL statement for the update trigger on the DiskServer table */
228
CREATE OR REPLACE TRIGGER tr_DiskServer_Update
Dennis Waldron's avatar
Dennis Waldron committed
229
BEFORE UPDATE OF status ON DiskServer
230
FOR EACH ROW WHEN (old.status != new.status)
231
BEGIN
232
233
  -- If the diskserver is coming back into PRODUCTION, initiate a consistency
  -- check for all the diskcopies on its associated filesystems which are in
234
235
236
  -- PRODUCTION.
  IF :old.status != dconst.DISKSERVER_PRODUCTION AND
     :new.status = dconst.DISKSERVER_PRODUCTION AND :new.hwOnline = 1 THEN
237
238
    FOR fs IN (SELECT id FROM FileSystem
                WHERE diskServer = :old.id
239
                  AND status = dconst.FILESYSTEM_PRODUCTION)
240
241
242
243
    LOOP
      checkFsBackInProd(fs.id);
    END LOOP;
  END IF;
244
END;
Dennis Waldron's avatar
Dennis Waldron committed
245
/
246

Sebastien Ponce's avatar
Sebastien Ponce committed
247

248
/* Trigger used to check if the maxReplicaNb has been exceeded
249
 * after a diskcopy has changed its status to VALID
250
251
252
253
254
 */
CREATE OR REPLACE TRIGGER tr_DiskCopy_Stmt_Online
AFTER UPDATE OF STATUS ON DISKCOPY
DECLARE
  maxReplicaNb NUMBER;
255
256
  unused NUMBER;
  nbFiles NUMBER;
257
258
259
260
BEGIN
  -- Loop over the diskcopies to be processed
  FOR a IN (SELECT * FROM TooManyReplicasHelper)
  LOOP
261
262
263
264
265
266
267
268
    -- Lock the castorfile. This shouldn't be necessary as the procedure that
    -- caused the trigger to be executed should already have the lock.
    -- Nevertheless, we make sure!
    SELECT id INTO unused FROM CastorFile
     WHERE id = a.castorfile FOR UPDATE;
    -- Get the max replica number of the service class
    SELECT maxReplicaNb INTO maxReplicaNb
      FROM SvcClass WHERE id = a.svcclass;
269
270
271
272
    -- Produce a list of diskcopies to invalidate should too many replicas be
    -- online.
    FOR b IN (SELECT id FROM (
                SELECT rownum ind, id FROM (
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
                  SELECT * FROM (
                    SELECT /*+ INDEX_RS_ASC (DiskCopy I_DiskCopy_Castorfile) */
                           FileSystem.status AS FsStatus, DiskServer.status AS DsStatus,
                           DiskCopy.gcWeight, DiskCopy.id
                      FROM DiskCopy, FileSystem, DiskPool2SvcClass,
                           DiskServer
                     WHERE DiskCopy.filesystem = FileSystem.id
                       AND FileSystem.diskpool = DiskPool2SvcClass.parent
                       AND FileSystem.diskserver = DiskServer.id
                       AND DiskPool2SvcClass.child = a.svcclass
                       AND DiskCopy.castorfile = a.castorfile
                       AND DiskCopy.status = dconst.DISKCOPY_VALID
                     UNION ALL
                    SELECT /*+ INDEX_RS_ASC (DiskCopy I_DiskCopy_Castorfile) */
                           0 AS FsStatus,
                           (SELECT MIN(status) FROM DiskServer
                             WHERE dataPool = DiskCopy.dataPool) AS DsStatus,
                           DiskCopy.gcWeight, DiskCopy.id
                      FROM DiskCopy, DataPool2SvcClass
                     WHERE DiskCopy.dataPool = DataPool2SvcClass.parent
                       AND DataPool2SvcClass.child = a.svcclass
                       AND DiskCopy.castorfile = a.castorfile
                       AND DiskCopy.status = dconst.DISKCOPY_VALID)
296
                   -- Select non-PRODUCTION hardware first
297
                   ORDER BY decode(fsStatus, 0, decode(dsStatus, 0, 0, 1), 1) ASC, gcWeight DESC))
298
               WHERE ind > maxReplicaNb)
299
    LOOP
300
      -- Sanity check, make sure that the last copy is never dropped!
301
      SELECT /*+ INDEX_RS_ASC(DiskCopy I_DiskCopy_CastorFile) */ count(*) INTO nbFiles
302
303
304
305
306
307
        FROM DiskCopy, FileSystem, DiskPool2SvcClass, SvcClass, DiskServer
       WHERE DiskCopy.filesystem = FileSystem.id
         AND FileSystem.diskpool = DiskPool2SvcClass.parent
         AND FileSystem.diskserver = DiskServer.id
         AND DiskPool2SvcClass.child = SvcClass.id
         AND DiskCopy.castorfile = a.castorfile
308
         AND DiskCopy.status = dconst.DISKCOPY_VALID
309
310
311
312
         AND SvcClass.id = a.svcclass;
      IF nbFiles = 1 THEN
        EXIT;  -- Last file, so exit the loop
      END IF;
313
314
      -- Invalidate the diskcopy
      UPDATE DiskCopy
315
316
         SET status = dconst.DISKCOPY_INVALID,
             gcType = dconst.GCTYPE_TOOMANYREPLICAS
317
       WHERE id = b.id;
318
319
320
321
      -- update importance of remaining diskcopies
      UPDATE DiskCopy SET importance = importance + 1
       WHERE castorFile = a.castorfile
         AND status = dconst.DISKCOPY_VALID;
322
323
    END LOOP;
  END LOOP;
324
325
326
  -- cleanup the table so that we do not accumulate lines. This would trigger
  -- a n^2 behavior until the next commit.
  DELETE FROM TooManyReplicasHelper;
327
END;
Dennis Waldron's avatar
Dennis Waldron committed
328
/
329
330


331
/* Trigger used to provide input to the statement level trigger
332
 * defined above
333
 */
334
335
CREATE OR REPLACE TRIGGER tr_DiskCopy_Created
AFTER INSERT ON DiskCopy
Sebastien Ponce's avatar
Sebastien Ponce committed
336
FOR EACH ROW
337
WHEN (new.status = 0) -- dconst.DISKCOPY_VALID
338
339
340
341
342
343
DECLARE
  svcId  NUMBER;
  unused NUMBER;
  -- Trap `ORA-00001: unique constraint violated` errors
  CONSTRAINT_VIOLATED EXCEPTION;
  PRAGMA EXCEPTION_INIT(CONSTRAINT_VIOLATED, -00001);
Sebastien Ponce's avatar
Sebastien Ponce committed
344
BEGIN
345
346
347
348
349
  -- Insert the information about the diskcopy being processed into
  -- the TooManyReplicasHelper. This information will be used later
  -- on the DiskCopy AFTER UPDATE statement level trigger. We cannot
  -- do the work of that trigger here as it would result in
  -- `ORA-04091: table is mutating, trigger/function` errors
350
  BEGIN
351
352
353
354
355
356
357
358
359
    SELECT child INTO svcId FROM (
      SELECT DiskPool2SvcClass.child
        FROM FileSystem, DiskPool2SvcClass
       WHERE FileSystem.diskpool = DiskPool2SvcClass.parent
         AND FileSystem.id = :new.filesystem
       UNION ALL
      SELECT child
        FROM DataPool2SvcClass
       WHERE parent = :new.dataPool);
360
  EXCEPTION WHEN TOO_MANY_ROWS THEN
361
    -- The DiskCopy belongs to multiple service classes which is not
362
363
364
365
366
    -- supported by the replica management trigger.
    RETURN;
  END;
  -- Insert an entry into the TooManyReplicasHelper table.
  BEGIN
367
    INSERT INTO TooManyReplicasHelper (svcClass, castorFile)
368
369
370
371
    VALUES (svcId, :new.castorfile);
  EXCEPTION WHEN CONSTRAINT_VIOLATED THEN
    RETURN;  -- Entry already exists!
  END;
Sebastien Ponce's avatar
Sebastien Ponce committed
372
END;
Dennis Waldron's avatar
Dennis Waldron committed
373
/
Sebastien Ponce's avatar
Sebastien Ponce committed
374

375
376
377
378
379
380
381
382
/* PL/SQL method to get the next SubRequest to do according to the given service */
CREATE OR REPLACE PROCEDURE jobSubRequestToDo(outSrId OUT INTEGER, outReqUuid OUT VARCHAR2,
                                              outReqType OUT INTEGER,
                                              outEuid OUT INTEGER, outEgid OUT INTEGER,
                                              outFileName OUT VARCHAR2, outSvcClassName OUT VARCHAR2,
                                              outFileClassIfForced OUT INTEGER,
                                              outFlags OUT INTEGER, outModeBits OUT INTEGER,
                                              outClientIpAddress OUT INTEGER,
383
384
                                              outClientPort OUT INTEGER, outClientVersion OUT INTEGER,
                                              outErrNo OUT INTEGER, outErrMsg OUT VARCHAR2) AS
385
386
387
388
389
390
391
392
393
394
395
396
397
398
  CURSOR SRcur IS
    SELECT /*+ FIRST_ROWS_10 INDEX_RS_ASC(SR I_SubRequest_RT_CT_ID) */ SR.id
      FROM SubRequest PARTITION (P_STATUS_0_1_2) SR  -- START, RESTART, RETRY
     WHERE SR.svcHandler = 'JobReqSvc'
     ORDER BY SR.creationTime ASC;
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
  varSrId INTEGER;
  varRequestId INTEGER;
  varSvcClassId INTEGER;
  varClientId INTEGER;
  varUnusedMessage VARCHAR2(2048);
  varUnusedStatus INTEGER;
BEGIN
399
  outErrNo := 0;
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
  -- Open a cursor on potential candidates
  OPEN SRcur;
  -- Retrieve the first candidate
  FETCH SRCur INTO varSrId;
  IF SRCur%NOTFOUND THEN
    -- There is no candidate available. Wait for next alert for a maximum of 3 seconds.
    -- We do not wait forever in order to to give the control back to the
    -- caller daemon in case it should exit.
    CLOSE SRCur;
    DBMS_ALERT.WAITONE('wakeUpJobReqSvc', varUnusedMessage, varUnusedStatus, 3);
    -- try again to find something now that we waited
    OPEN SRCur;
    FETCH SRCur INTO varSrId;
    IF SRCur%NOTFOUND THEN
      -- still nothing. We will give back the control to the application
      -- so that it can handle cases like signals and exit. We will probably
      -- be back soon :-)
      RETURN;
    END IF;
  END IF;
  -- Loop on candidates until we can lock one
  LOOP
    BEGIN
      -- Try to take a lock on the current candidate, and revalidate its status
      SELECT /*+ INDEX(SR PK_SubRequest_ID) */ id INTO varSrId
        FROM SubRequest PARTITION (P_STATUS_0_1_2) SR
       WHERE id = varSrId FOR UPDATE NOWAIT;
      -- Since we are here, we got the lock. We have our winner, let's update it
      UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
         SET status = dconst.SUBREQUEST_WAITSCHED, subReqId = nvl(subReqId, uuidGen())
       WHERE id = varSrId
      RETURNING id, reqType, fileName, flags, modeBits, request
        INTO outSrId, outReqType, outFileName, outFlags, outModeBits, varRequestId;
      EXIT;
    EXCEPTION
      WHEN NO_DATA_FOUND THEN
        -- Got to next candidate, this subrequest was processed already and its status changed
        NULL;
      WHEN SrLocked THEN
        -- Go to next candidate, this subrequest is being processed by another thread
        NULL;
    END;
    -- we are here because the current candidate could not be handled
    -- let's go to the next one
    FETCH SRcur INTO varSrId;
    IF SRcur%NOTFOUND THEN
      -- no next one ? then we can return
      RETURN;
    END IF;
  END LOOP;
  CLOSE SRcur;

  BEGIN
    -- XXX This could be done in a single EXECUTE IMMEDIATE statement, but to make it
    -- XXX efficient we implement a CASE construct. At a later time the FileRequests should
    -- XXX be merged in a single table (partitioned by reqType) to avoid the following block.
    CASE
      WHEN outReqType = 40 THEN -- StagePutRequest
458
459
        SELECT reqId, euid, egid, svcClass, svcClassName, client
          INTO outReqUuid, outEuid, outEgid, varSvcClassId, outSvcClassName, varClientId
460
461
          FROM StagePutRequest WHERE id = varRequestId;
      WHEN outReqType = 35 THEN -- StageGetRequest
462
463
        SELECT reqId, euid, egid, svcClass, svcClassName, client
          INTO outReqUuid, outEuid, outEgid, varSvcClassId, outSvcClassName, varClientId
464
465
          FROM StageGetRequest WHERE id = varRequestId;
      WHEN outReqType = 37 THEN -- StagePrepareToPutRequest
466
467
        SELECT reqId, euid, egid, svcClass, svcClassName, client
          INTO outReqUuid, outEuid, outEgid, varSvcClassId, outSvcClassName, varClientId
468
469
          FROM StagePrepareToPutRequest WHERE id = varRequestId;
      WHEN outReqType = 36 THEN -- StagePrepareToGetRequest
470
471
        SELECT reqId, euid, egid, svcClass, svcClassName, client
          INTO outReqUuid, outEuid, outEgid, varSvcClassId, outSvcClassName, varClientId
472
473
474
475
476
          FROM StagePrepareToGetRequest WHERE id = varRequestId;
    END CASE;
    SELECT ipAddress, port, version
      INTO outClientIpAddress, outClientPort, outClientVersion
      FROM Client WHERE id = varClientId;
477
478
479
480
481
482
483
484
485
486
    BEGIN
      SELECT FileClass.classId INTO outFileClassIfForced
        FROM SvcClass, FileClass
       WHERE SvcClass.id = varSvcClassId
         AND FileClass.id(+) = SvcClass.forcedFileClass;
    EXCEPTION WHEN NO_DATA_FOUND THEN
      archiveSubReq(outSrId, dconst.SUBREQUEST_FAILED_FINISHED);
      outErrno := serrno.EINVAL;
      outErrMsg := 'Invalid service class ''' || outSvcClassName || '''';
    END;
487
488
489
490
491
492
493
494
495
496
497
  EXCEPTION WHEN OTHERS THEN
    -- Something went really wrong, our subrequest does not have the corresponding request or client,
    -- Just drop it and re-raise exception. Some rare occurrences have happened in the past,
    -- this catch-all logic protects the stager-scheduling system from getting stuck with a single such case.
    archiveSubReq(outSrId, dconst.SUBREQUEST_FAILED_FINISHED);
    COMMIT;
    raise_application_error(-20100, 'Request got corrupted and could not be processed : ' ||
                                    SQLCODE || ' -ERROR- ' || SQLERRM);
  END;
END;
/
498

499
/* PL/SQL method to get the next SubRequest to do according to the given service */
500
CREATE OR REPLACE PROCEDURE subRequestToDo(service IN VARCHAR2,
501
                                           srId OUT INTEGER, srRetryCounter OUT INTEGER, srFileName OUT VARCHAR2,
502
503
                                           srProtocol OUT VARCHAR2, srXsize OUT INTEGER,
                                           srModeBits OUT INTEGER, srFlags OUT INTEGER,
504
                                           srSubReqId OUT VARCHAR2, srReqType OUT INTEGER,
505
506
507
508
509
510
                                           rId OUT INTEGER, rFlags OUT INTEGER, rUsername OUT VARCHAR2, rEuid OUT INTEGER,
                                           rEgid OUT INTEGER, rMask OUT INTEGER, rPid OUT INTEGER, rMachine OUT VARCHAR2,
                                           rSvcClassName OUT VARCHAR2, rUserTag OUT VARCHAR2, rReqId OUT VARCHAR2,
                                           rCreationTime OUT INTEGER, rLastModificationTime OUT INTEGER,
                                           rRepackVid OUT VARCHAR2, rGCWeight OUT INTEGER,
                                           clIpAddress OUT INTEGER, clPort OUT INTEGER, clVersion OUT INTEGER) AS
511
  CURSOR SRcur IS
512
    SELECT /*+ FIRST_ROWS_10 INDEX_RS_ASC(SR I_SubRequest_RT_CT_ID) */ SR.id
513
514
515
      FROM SubRequest PARTITION (P_STATUS_0_1_2) SR  -- START, RESTART, RETRY
     WHERE SR.svcHandler = service
     ORDER BY SR.creationTime ASC;
516
517
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
518
519
520
  varSrId NUMBER;
  varRName VARCHAR2(100);
  varClientId NUMBER;
521
522
  varUnusedMessage VARCHAR2(2048);
  varUnusedStatus INTEGER;
523
BEGIN
524
  -- Open a cursor on potential candidates
525
  OPEN SRcur;
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
  -- Retrieve the first candidate
  FETCH SRCur INTO varSrId;
  IF SRCur%NOTFOUND THEN
    -- There is no candidate available. Wait for next alert for a maximum of 3 seconds.
    -- We do not wait forever in order to to give the control back to the
    -- caller daemon in case it should exit.
    CLOSE SRCur;
    DBMS_ALERT.WAITONE('wakeUp'||service, varUnusedMessage, varUnusedStatus, 3);
    -- try again to find something now that we waited
    OPEN SRCur;
    FETCH SRCur INTO varSrId;
    IF SRCur%NOTFOUND THEN
      -- still nothing. We will give back the control to the application
      -- so that it can handle cases like signals and exit. We will probably
      -- be back soon :-)
541
542
      RETURN;
    END IF;
543
544
545
  END IF;
  -- Loop on candidates until we can lock one
  LOOP
546
547
    BEGIN
      -- Try to take a lock on the current candidate, and revalidate its status
548
      SELECT /*+ INDEX(SR PK_SubRequest_ID) */ id INTO varSrId
549
        FROM SubRequest PARTITION (P_STATUS_0_1_2) SR
550
       WHERE id = varSrId FOR UPDATE NOWAIT;
551
      -- Since we are here, we got the lock. We have our winner, let's update it
552
553
      UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
         SET status = dconst.SUBREQUEST_WAITSCHED, subReqId = nvl(subReqId, uuidGen())
554
555
       WHERE id = varSrId
      RETURNING id, retryCounter, fileName, protocol, xsize, modeBits, flags, subReqId,
556
        reqType, request, (SELECT object FROM Type2Obj WHERE type = reqType)
557
        INTO srId, srRetryCounter, srFileName, srProtocol, srXsize, srModeBits, srFlags, srSubReqId,
558
        srReqType, rId, varRName;
559
560
561
562
563
564
565
566
567
      EXIT;
    EXCEPTION
      WHEN NO_DATA_FOUND THEN
        -- Got to next candidate, this subrequest was processed already and its status changed
        NULL;
      WHEN SrLocked THEN
        -- Go to next candidate, this subrequest is being processed by another thread
        NULL;
    END;
568
569
570
571
572
573
574
    -- we are here because the current candidate could not be handled
    -- let's go to the next one
    FETCH SRcur INTO varSrId;
    IF SRcur%NOTFOUND THEN
      -- no next one ? then we can return
      RETURN;
    END IF;
575
576
  END LOOP;
  CLOSE SRcur;
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598

  BEGIN
    -- XXX This could be done in a single EXECUTE IMMEDIATE statement, but to make it
    -- XXX efficient we implement a CASE construct. At a later time the FileRequests should
    -- XXX be merged in a single table (partitioned by reqType) to avoid the following block.
    CASE
      WHEN varRName = 'StagePutDoneRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StagePutDoneRequest WHERE id = rId;
      WHEN varRName = 'StageRmRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StageRmRequest WHERE id = rId;
      WHEN varRName = 'SetFileGCWeight' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, weight, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, rGcWeight, varClientId
          FROM SetFileGCWeight WHERE id = rId;
    END CASE;
    SELECT ipAddress, port, version
      INTO clIpAddress, clPort, clVersion
      FROM Client WHERE id = varClientId;
599
  EXCEPTION WHEN OTHERS THEN
600
    -- Something went really wrong, our subrequest does not have the corresponding request or client,
601
602
603
604
    -- Just drop it and re-raise exception. Some rare occurrences have happened in the past,
    -- this catch-all logic protects the stager-scheduling system from getting stuck with a single such case.
    archiveSubReq(varSrId, dconst.SUBREQUEST_FAILED_FINISHED);
    COMMIT;
605
606
    raise_application_error(-20100, 'Request got corrupted and could not be processed : ' ||
                                    SQLCODE || ' -ERROR- ' || SQLERRM);
607
  END;
608
END;
Dennis Waldron's avatar
Dennis Waldron committed
609
/
610

611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
/* PL/SQL method to fail a subrequest in WAITTAPERECALL
 * and eventually the recall itself if it's the only subrequest waiting for it
 */
CREATE OR REPLACE PROCEDURE failRecallSubReq(inSrId IN INTEGER, inCfId IN INTEGER) AS
  varNbSRs INTEGER;
BEGIN
  -- recall case. First fail the subrequest
  UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
     SET status = dconst.SUBREQUEST_FAILED
   WHERE id = inSrId;
  -- check whether there are other subRequests waiting for a recall
  SELECT COUNT(*) INTO varNbSrs
    FROM SubRequest
   WHERE castorFile = inCfId
     AND status = dconst.SUBREQUEST_WAITTAPERECALL;
  IF varNbSrs = 0 THEN
    -- no other subrequests, so drop recalls
    deleteRecallJobs(inCfId);
  END IF;
END;
/

633
634
635
636
637
638
/* PL/SQL method to process bulk abort on a given get/prepareToGet request */
CREATE OR REPLACE PROCEDURE processAbortForGet(sr processBulkAbortFileReqsHelper%ROWTYPE) AS
  abortedSRstatus NUMBER;
BEGIN
  -- note the revalidation of the status and even of the existence of the subrequest
  -- as it may have changed before we got the lock on the Castorfile in processBulkAbortFileReqs
639
640
641
642
  SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ status
    INTO abortedSRstatus
    FROM SubRequest
   WHERE id = sr.srId;
643
644
645
646
647
648
649
650
  CASE
    WHEN abortedSRstatus = dconst.SUBREQUEST_START
      OR abortedSRstatus = dconst.SUBREQUEST_RESTART
      OR abortedSRstatus = dconst.SUBREQUEST_RETRY
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSCHED
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSUBREQ
      OR abortedSRstatus = dconst.SUBREQUEST_READY
      OR abortedSRstatus = dconst.SUBREQUEST_REPACK
651
      OR abortedSRstatus = dconst.SUBREQUEST_READYFORSCHED THEN
652
      -- standard case, we only have to fail the subrequest
653
      UPDATE SubRequest SET status = dconst.SUBREQUEST_FAILED WHERE id = sr.srId;
654
655
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
      VALUES (sr.fileId, sr.nsHost, 0, '');
656
    WHEN abortedSRstatus = dconst.SUBREQUEST_WAITTAPERECALL THEN
657
        failRecallSubReq(sr.srId, sr.cfId);
658
659
        INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
        VALUES (sr.fileId, sr.nsHost, 0, '');
660
    WHEN abortedSRstatus = dconst.SUBREQUEST_FAILED
661
      OR abortedSRstatus = dconst.SUBREQUEST_FAILED_FINISHED THEN
662
      -- subrequest has failed, nothing to abort
663
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
664
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort failed subRequest');
665
666
667
    WHEN abortedSRstatus = dconst.SUBREQUEST_FINISHED
      OR abortedSRstatus = dconst.SUBREQUEST_ARCHIVED THEN
      -- subrequest is over, nothing to abort
668
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
669
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort completed subRequest');
670
671
    ELSE
      -- unknown status !
672
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
673
      VALUES (sr.fileId, sr.nsHost, serrno.SEINTERNAL, 'Found unknown status for request : ' || TO_CHAR(abortedSRstatus));
674
675
676
  END CASE;
EXCEPTION WHEN NO_DATA_FOUND THEN
  -- subRequest was deleted in the mean time !
677
  INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
678
  VALUES (sr.fileId, sr.nsHost, serrno.ENOENT, 'Targeted SubRequest has just been deleted');
679
680
681
682
683
684
685
686
687
END;
/

/* PL/SQL method to process bulk abort on a given put/prepareToPut request */
CREATE OR REPLACE PROCEDURE processAbortForPut(sr processBulkAbortFileReqsHelper%ROWTYPE) AS
  abortedSRstatus NUMBER;
BEGIN
  -- note the revalidation of the status and even of the existence of the subrequest
  -- as it may have changed before we got the lock on the Castorfile in processBulkAbortFileReqs
688
  SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ status INTO abortedSRstatus FROM SubRequest WHERE id = sr.srId;
689
690
691
692
693
694
695
696
  CASE
    WHEN abortedSRstatus = dconst.SUBREQUEST_START
      OR abortedSRstatus = dconst.SUBREQUEST_RESTART
      OR abortedSRstatus = dconst.SUBREQUEST_RETRY
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSCHED
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSUBREQ
      OR abortedSRstatus = dconst.SUBREQUEST_READY
      OR abortedSRstatus = dconst.SUBREQUEST_REPACK
697
      OR abortedSRstatus = dconst.SUBREQUEST_READYFORSCHED THEN
698
      -- standard case, we only have to fail the subrequest
699
700
701
      UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
         SET status = dconst.SUBREQUEST_FAILED
       WHERE id = sr.srId;
702
703
704
705
      UPDATE DiskCopy
         SET status = decode(status, dconst.DISKCOPY_WAITFS, dconst.DISKCOPY_FAILED,
                                     dconst.DISKCOPY_WAITFS_SCHEDULING, dconst.DISKCOPY_FAILED,
                                     dconst.DISKCOPY_INVALID)
706
707
708
       WHERE castorfile = sr.cfid AND status IN (dconst.DISKCOPY_STAGEOUT,
                                                 dconst.DISKCOPY_WAITFS,
                                                 dconst.DISKCOPY_WAITFS_SCHEDULING);
709
710
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
      VALUES (sr.fileId, sr.nsHost, 0, '');
711
    WHEN abortedSRstatus = dconst.SUBREQUEST_FAILED
712
      OR abortedSRstatus = dconst.SUBREQUEST_FAILED_FINISHED THEN
713
      -- subrequest has failed, nothing to abort
714
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
715
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort failed subRequest');
716
717
718
    WHEN abortedSRstatus = dconst.SUBREQUEST_FINISHED
      OR abortedSRstatus = dconst.SUBREQUEST_ARCHIVED THEN
      -- subrequest is over, nothing to abort
719
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
720
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort completed subRequest');
721
722
    ELSE
      -- unknown status !
723
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
724
      VALUES (sr.fileId, sr.nsHost, serrno.SEINTERNAL, 'Found unknown status for request : ' || TO_CHAR(abortedSRstatus));
725
726
727
  END CASE;
EXCEPTION WHEN NO_DATA_FOUND THEN
  -- subRequest was deleted in the mean time !
728
  INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
729
  VALUES (sr.fileId, sr.nsHost, serrno.ENOENT, 'Targeted SubRequest has just been deleted');
730
731
732
END;
/

733
/* PL/SQL method to process bulk abort on a given Repack request */
734
CREATE OR REPLACE PROCEDURE processBulkAbortForRepack(origReqId IN INTEGER) AS
735
  abortedSRstatus INTEGER := -1;
736
737
  srsToUpdate "numList";
  dcmigrsToUpdate "numList";
738
739
  nbItems INTEGER;
  nbItemsDone INTEGER := 0;
740
741
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
742
743
  cfId INTEGER;
  srId INTEGER;
744
745
  firstOne BOOLEAN := TRUE;
  commitWork BOOLEAN := FALSE;
746
  varOriginalVID VARCHAR2(2048);
747
BEGIN
748
749
  -- get the VID of the aborted repack request
  SELECT repackVID INTO varOriginalVID FROM StageRepackRequest WHERE id = origReqId;
750
  -- Gather the list of subrequests to abort
751
  INSERT INTO ProcessBulkAbortFileReqsHelper (srId, cfId, fileId, nsHost, uuid) (
752
    SELECT /*+ INDEX_RS_ASC(Subrequest I_Subrequest_CastorFile)*/
753
754
755
756
           SubRequest.id, CastorFile.id, CastorFile.fileId, CastorFile.nsHost, SubRequest.subreqId
      FROM SubRequest, CastorFile
     WHERE SubRequest.castorFile = CastorFile.id
       AND request = origReqId
757
       AND status IN (dconst.SUBREQUEST_START, dconst.SUBREQUEST_RESTART, dconst.SUBREQUEST_RETRY,
758
759
760
761
762
763
764
765
766
767
                      dconst.SUBREQUEST_WAITSUBREQ, dconst.SUBREQUEST_WAITTAPERECALL,
                      dconst.SUBREQUEST_REPACK));
  SELECT COUNT(*) INTO nbItems FROM processBulkAbortFileReqsHelper;
  -- handle aborts in bulk while avoiding deadlocks
  WHILE nbItems > 0 LOOP
    FOR sr IN (SELECT srId, cfId, fileId, nsHost, uuid FROM processBulkAbortFileReqsHelper) LOOP
      BEGIN
        IF firstOne THEN
          -- on the first item, we take a blocking lock as we are sure that we will not
          -- deadlock and we would like to process at least one item to not loop endlessly
768
          SELECT id INTO cfId FROM CastorFile WHERE id = sr.cfId FOR UPDATE;
769
770
771
772
773
774
775
          firstOne := FALSE;
        ELSE
          -- on the other items, we go for a non blocking lock. If we get it, that's
          -- good and we process this extra subrequest within the same session. If
          -- we do not get the lock, then we close the session here and go for a new
          -- one. This will prevent dead locks while ensuring that a minimal number of
          -- commits is performed.
776
          SELECT id INTO cfId FROM CastorFile WHERE id = sr.cfId FOR UPDATE NOWAIT;
777
778
779
780
781
782
783
784
        END IF;
        -- note the revalidation of the status and even of the existence of the subrequest
        -- as it may have changed before we got the lock on the Castorfile in processBulkAbortFileReqs
        SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ status
          INTO abortedSRstatus
          FROM SubRequest
         WHERE id = sr.srId;
        CASE
785
786
          WHEN abortedSRstatus = dconst.SUBREQUEST_START
            OR abortedSRstatus = dconst.SUBREQUEST_RESTART
787
788
789
            OR abortedSRstatus = dconst.SUBREQUEST_RETRY
            OR abortedSRstatus = dconst.SUBREQUEST_WAITSUBREQ THEN
            -- easy case, we only have to fail the subrequest
790
            INSERT INTO ProcessRepackAbortHelperSR (srId) VALUES (sr.srId);
791
          WHEN abortedSRstatus = dconst.SUBREQUEST_WAITTAPERECALL THEN
792
793
            -- recall case, fail the subRequest and cancel the recall if needed
            failRecallSubReq(sr.srId, sr.cfId);
794
          WHEN abortedSRstatus = dconst.SUBREQUEST_REPACK THEN
795
            -- trigger the update the subrequest status to FAILED
796
            INSERT INTO ProcessRepackAbortHelperSR (srId) VALUES (sr.srId);
797
798
799
800
            -- delete migration jobs of this repack, hence stopping selectively the migrations
            DELETE FROM MigrationJob WHERE castorfile = sr.cfId AND originalVID = varOriginalVID;
            -- delete migrated segments if no migration jobs remain
            BEGIN
801
              SELECT id INTO cfId FROM MigrationJob WHERE castorfile = sr.cfId AND ROWNUM < 2;
802
803
            EXCEPTION WHEN NO_DATA_FOUND THEN
              DELETE FROM MigratedSegment WHERE castorfile = sr.cfId;
804
              -- trigger the update of the CastorFile's tapeStatus to ONTAPE as no more migrations remain
805
              INSERT INTO ProcessRepackAbortHelperDCmigr (cfId) VALUES (sr.cfId);
806
807
808
809
            END;
           WHEN abortedSRstatus IN (dconst.SUBREQUEST_FAILED,
                                    dconst.SUBREQUEST_FINISHED,
                                    dconst.SUBREQUEST_FAILED_FINISHED,
810
                                    dconst.SUBREQUEST_ARCHIVED) THEN
811
812
             -- nothing to be done here
             NULL;
813
814
815
816
817
818
819
820
821
822
823
824
825
826
        END CASE;
        DELETE FROM processBulkAbortFileReqsHelper WHERE srId = sr.srId;
        nbItemsDone := nbItemsDone + 1;
      EXCEPTION WHEN SrLocked THEN
        commitWork := TRUE;
      END;
      -- commit anyway from time to time, to avoid too long redo logs
      IF commitWork OR nbItemsDone >= 1000 THEN
        -- exit the current loop and restart a new one, in order to commit without getting invalid ROWID errors
        EXIT;
      END IF;
    END LOOP;
    -- do the bulk updates
    SELECT srId BULK COLLECT INTO srsToUpdate FROM ProcessRepackAbortHelperSR;
827
    FORALL i IN 1 .. srsToUpdate.COUNT
828
      UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
829
         SET diskCopy = NULL, lastModificationTime = getTime(),
830
831
             status = dconst.SUBREQUEST_FAILED_FINISHED,
             errorCode = 1701, errorMessage = 'Aborted explicitely'  -- ESTCLEARED
832
833
       WHERE id = srsToUpdate(i);
    SELECT cfId BULK COLLECT INTO dcmigrsToUpdate FROM ProcessRepackAbortHelperDCmigr;
834
    FORALL i IN 1 .. dcmigrsToUpdate.COUNT
835
      UPDATE CastorFile SET tapeStatus = dconst.CASTORFILE_ONTAPE WHERE id = dcmigrsToUpdate(i);
836
837
838
839
840
841
842
843
    -- commit
    COMMIT;
    -- reset all counters
    nbItems := nbItems - nbItemsDone;
    nbItemsDone := 0;
    firstOne := TRUE;
    commitWork := FALSE;
  END LOOP;
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
  -- archive the request
  BEGIN
    SELECT id, status INTO srId, abortedSRstatus
      FROM SubRequest
     WHERE request = origReqId
       AND status IN (dconst.SUBREQUEST_FINISHED, dconst.SUBREQUEST_FAILED_FINISHED)
       AND ROWNUM = 1;
    -- This procedure should really be called 'terminateSubReqAndArchiveRequest', and this is
    -- why we call it here: we need to trigger the logic to mark the whole request and all of its subrequests
    -- as ARCHIVED, so that they are cleaned up afterwards. Note that this is effectively
    -- a no-op for the status change of the single fetched SubRequest.
    archiveSubReq(srId, abortedSRstatus);
  EXCEPTION WHEN NO_DATA_FOUND THEN
    -- Should never happen, anyway ignore as there's nothing else to do
    NULL;
  END;
860
  COMMIT;
861
862
863
END;
/

864
865
/* PL/SQL method to process bulk abort on files related requests */
CREATE OR REPLACE PROCEDURE processBulkAbortFileReqs
866
(origReqId IN INTEGER, fileIds IN "numList", nsHosts IN strListTable, reqType IN NUMBER) AS
867
  nbItems NUMBER;
868
  nbItemsDone NUMBER := 0;
869
870
871
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
  unused NUMBER;
872
873
  firstOne BOOLEAN := TRUE;
  commitWork BOOLEAN := FALSE;
874
875
876
877
BEGIN
  -- Gather the list of subrequests to abort
  IF fileIds.count() = 0 THEN
    -- handle the case of an empty request, meaning that all files should be aborted
878
    INSERT INTO ProcessBulkAbortFileReqsHelper (srId, cfId, fileId, nsHost, uuid) (
879
      SELECT /*+ INDEX_RS_ASC(Subrequest I_Subrequest_Request)*/
880
             SubRequest.id, CastorFile.id, CastorFile.fileId, CastorFile.nsHost, SubRequest.subreqId
881
882
        FROM SubRequest, CastorFile
       WHERE SubRequest.castorFile = CastorFile.id
883
         AND request = origReqId);
884
885
886
887
888
889
  ELSE
    -- handle the case of selective abort
    FOR i IN fileIds.FIRST .. fileIds.LAST LOOP
      DECLARE
        srId NUMBER;
        cfId NUMBER;
890
        srUuid VARCHAR(2048);
891
      BEGIN
892
        SELECT /*+ INDEX_RS_ASC(Subrequest I_Subrequest_CastorFile)*/
893
               SubRequest.id, CastorFile.id, SubRequest.subreqId INTO srId, cfId, srUuid
894
895
896
897
898
          FROM SubRequest, CastorFile
         WHERE request = origReqId
           AND SubRequest.castorFile = CastorFile.id
           AND CastorFile.fileid = fileIds(i)
           AND CastorFile.nsHost = nsHosts(i);
899
900
        INSERT INTO processBulkAbortFileReqsHelper (srId, cfId, fileId, nsHost, uuid)
        VALUES (srId, cfId, fileIds(i), nsHosts(i), srUuid);
901
902
      EXCEPTION WHEN NO_DATA_FOUND THEN
        -- this fileid/nshost did not exist in the request, send an error back
903
        INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
904
        VALUES (fileIds(i), nsHosts(i), serrno.ENOENT, 'No subRequest found for this fileId/nsHost');
905
906
907
908
909
910
      END;
    END LOOP;
  END IF;
  SELECT COUNT(*) INTO nbItems FROM processBulkAbortFileReqsHelper;
  -- handle aborts in bulk while avoiding deadlocks
  WHILE nbItems > 0 LOOP
911
    FOR sr IN (SELECT srId, cfId, fileId, nsHost, uuid FROM processBulkAbortFileReqsHelper) LOOP
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
      BEGIN
        IF firstOne THEN
          -- on the first item, we take a blocking lock as we are sure that we will not
          -- deadlock and we would like to process at least one item to not loop endlessly
          SELECT id INTO unused FROM CastorFile WHERE id = sr.cfId FOR UPDATE;
          firstOne := FALSE;
        ELSE
          -- on the other items, we go for a non blocking lock. If we get it, that's
          -- good and we process this extra subrequest within the same session. If
          -- we do not get the lock, then we close the session here and go for a new
          -- one. This will prevent dead locks while ensuring that a minimal number of
          -- commits is performed.
          SELECT id INTO unused FROM CastorFile WHERE id = sr.cfId FOR UPDATE NOWAIT;
        END IF;
        -- we got the lock on the Castorfile, we can handle the abort for this subrequest
        CASE reqType
          WHEN 1 THEN processAbortForGet(sr);
          WHEN 2 THEN processAbortForPut(sr);
        END CASE;
        DELETE FROM processBulkAbortFileReqsHelper WHERE srId = sr.srId;
932
        -- make the scheduler aware so that it can remove the transfer from the queues if needed
933
        INSERT INTO TransfersToAbort (uuid) VALUES (sr.uuid);
934
        nbItemsDone := nbItemsDone + 1;
935
      EXCEPTION WHEN SrLocked THEN
936
        commitWork := TRUE;
937
      END;
938
939
940
941
942
      -- commit anyway from time to time, to avoid too long redo logs
      IF commitWork OR nbItemsDone >= 1000 THEN
        -- exit the current loop and restart a new one, in order to commit without getting invalid ROWID errors
        EXIT;
      END IF;
943
    END LOOP;
944
945
946
    -- commit
    COMMIT;
    -- wake up the scheduler so that it can remove the transfer from the queues
947
    alertSignalNoLock('transfersToAbort');
948
949
950
951
952
    -- reset all counters
    nbItems := nbItems - nbItemsDone;
    nbItemsDone := 0;
    firstOne := TRUE;
    commitWork := FALSE;
953
954
955
956
957
958
959
960
961
962
963
964
965
966
  END LOOP;
END;
/

/* PL/SQL method to process bulk abort requests */
CREATE OR REPLACE PROCEDURE processBulkAbort(abortReqId IN INTEGER, rIpAddress OUT INTEGER,
                                             rport OUT INTEGER, rReqUuid OUT VARCHAR2) AS
  clientId NUMBER;
  reqType NUMBER;
  requestId NUMBER;
  abortedReqUuid VARCHAR(2048);
  fileIds "numList";
  nsHosts strListTable;
  ids "numList";
967
  nsHostName VARCHAR2(2048);
968
BEGIN
969
970
  -- get the stager/nsHost configuration option
  nsHostName := getConfigOption('stager', 'nsHost', '');
971
972
973
974
975
  -- get request and client informations and drop them from the DB
  DELETE FROM StageAbortRequest WHERE id = abortReqId
    RETURNING reqId, parentUuid, client INTO rReqUuid, abortedReqUuid, clientId;
  DELETE FROM Client WHERE id = clientId
    RETURNING ipAddress, port INTO rIpAddress, rport;
976
977
978
979
  -- list fileids to process and drop them from the DB; override the
  -- nsHost in case it is defined in the configuration
  SELECT fileid, decode(nsHostName, '', nsHost, nsHostName), id
    BULK COLLECT INTO fileIds, nsHosts, ids
980
    FROM NsFileId WHERE request = abortReqId;
981
  FORALL i IN 1 .. ids.COUNT DELETE FROM NsFileId WHERE id = ids(i);
982
983
984
  -- dispatch actual processing depending on request type
  BEGIN
    SELECT rType, id INTO reqType, requestId FROM
985
986
987
988
989
990
991
      (SELECT /*+ INDEX(StageGetRequest I_StageGetRequest_ReqId) */
              reqId, id, 1 as rtype from StageGetRequest UNION ALL
       SELECT /*+ INDEX(StagePrepareToGetRequest I_StagePTGRequest_ReqId) */
              reqId, id, 1 as rtype from StagePrepareToGetRequest UNION ALL
       SELECT /*+ INDEX(stagePutRequest I_stagePutRequest_ReqId) */
              reqId, id, 2 as rtype from StagePutRequest UNION ALL
       SELECT /*+ INDEX(StagePrepareToPutRequest I_StagePTPRequest_ReqId) */
992
993
994
              reqId, id, 2 as rtype from StagePrepareToPutRequest UNION ALL
       SELECT /*+ INDEX(StageRepackRequest I_RepackRequest_ReqId) */
              reqId, id, 3 as rtype from StageRepackRequest)
995
996
997
     WHERE reqId = abortedReqUuid;
  EXCEPTION WHEN NO_DATA_FOUND THEN
    -- abort on non supported request type
998
    INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
999
    VALUES (0, '', serrno.ENOENT, 'Request not found, or abort not supported for this request type');
1000
    RETURN;