oracleStager.sql 154 KB
Newer Older
1
/*******************************************************************
2
 *
3
 * PL/SQL code for the stager and resource monitoring
4
5
6
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *******************************************************************/
7

8
/* PL/SQL declaration for the castor package */
9
CREATE OR REPLACE PACKAGE castor AS
10
  TYPE DiskCopyCore IS RECORD (
11
12
13
14
15
16
    id INTEGER,
    path VARCHAR2(2048),
    status NUMBER,
    fsWeight NUMBER,
    mountPoint VARCHAR2(2048),
    diskServer VARCHAR2(2048));
17
  TYPE DiskCopy_Cur IS REF CURSOR RETURN DiskCopyCore;
18
19
  TYPE "strList" IS TABLE OF VARCHAR2(2048) index BY binary_integer;
  TYPE "cnumList" IS TABLE OF NUMBER index BY binary_integer;
20
  TYPE QueryLine IS RECORD (
21
22
23
24
25
26
27
28
29
    fileid INTEGER,
    nshost VARCHAR2(2048),
    diskCopyId INTEGER,
    diskCopyPath VARCHAR2(2048),
    filesize INTEGER,
    diskCopyStatus INTEGER,
    diskServerName VARCHAR2(2048),
    fileSystemMountPoint VARCHAR2(2048),
    nbaccesses INTEGER,
30
31
32
    lastKnownFileName VARCHAR2(2048),
    creationTime INTEGER,
    svcClass VARCHAR2(2048),
33
34
    lastAccessTime INTEGER,
    hwStatus INTEGER);
35
  TYPE QueryLine_Cur IS REF CURSOR RETURN QueryLine;
36
37
38
39
  TYPE FileList IS RECORD (
    fileId NUMBER,
    nsHost VARCHAR2(2048));
  TYPE FileList_Cur IS REF CURSOR RETURN FileList;
40
  TYPE DiskPoolQueryLine IS RECORD (
41
42
43
44
45
46
47
48
49
50
    isDP INTEGER,
    isDS INTEGER,
    diskServerName VARCHAR(2048),
    diskServerStatus INTEGER,
    fileSystemmountPoint VARCHAR(2048),
    fileSystemfreeSpace INTEGER,
    fileSystemtotalSpace INTEGER,
    fileSystemminfreeSpace INTEGER,
    fileSystemmaxFreeSpace INTEGER,
    fileSystemStatus INTEGER);
51
52
  TYPE DiskPoolQueryLine_Cur IS REF CURSOR RETURN DiskPoolQueryLine;
  TYPE DiskPoolsQueryLine IS RECORD (
53
54
55
56
57
58
59
60
61
62
63
    isDP INTEGER,
    isDS INTEGER,
    diskPoolName VARCHAR(2048),
    diskServerName VARCHAR(2048),
    diskServerStatus INTEGER,
    fileSystemmountPoint VARCHAR(2048),
    fileSystemfreeSpace INTEGER,
    fileSystemtotalSpace INTEGER,
    fileSystemminfreeSpace INTEGER,
    fileSystemmaxFreeSpace INTEGER,
    fileSystemStatus INTEGER);
64
65
66
  TYPE DiskPoolsQueryLine_Cur IS REF CURSOR RETURN DiskPoolsQueryLine;
  TYPE IDRecord IS RECORD (id INTEGER);
  TYPE IDRecord_Cur IS REF CURSOR RETURN IDRecord;
67
68
  TYPE UUIDRecord IS RECORD (uuid VARCHAR(2048));
  TYPE UUIDRecord_Cur IS REF CURSOR RETURN UUIDRecord;
69
70
  TYPE UUIDPairRecord IS RECORD (uuid1 VARCHAR(2048), uuid2 VARCHAR(2048));
  TYPE UUIDPairRecord_Cur IS REF CURSOR RETURN UUIDPairRecord;
71
72
  TYPE TransferRecord IS RECORD (subreId VARCHAR(2048), resId VARCHAR(2048), fileId NUMBER, nsHost VARCHAR2(2048));
  TYPE TransferRecord_Cur IS REF CURSOR RETURN TransferRecord;
73
74
  TYPE StringValue IS RECORD (value VARCHAR(2048));
  TYPE StringList_Cur IS REF CURSOR RETURN StringValue;
75
  TYPE FileEntry IS RECORD (
76
77
    fileid INTEGER,
    nshost VARCHAR2(2048));
78
  TYPE FileEntry_Cur IS REF CURSOR RETURN FileEntry;
79
80
81
  TYPE TapeAccessPriority IS RECORD (
    euid INTEGER,
    egid INTEGER,
82
    priority INTEGER);
83
  TYPE TapeAccessPriority_Cur IS REF CURSOR RETURN TapeAccessPriority;
Giulia Taurelli's avatar
Giulia Taurelli committed
84
  TYPE StreamReport IS RECORD (
85
86
    diskserver VARCHAR2(2048),
    mountPoint VARCHAR2(2048));
87
  TYPE StreamReport_Cur IS REF CURSOR RETURN StreamReport;
88
  TYPE FileResult IS RECORD (
89
90
91
92
    fileid INTEGER,
    nshost VARCHAR2(2048),
    errorcode INTEGER,
    errormessage VARCHAR2(2048));
93
  TYPE FileResult_Cur IS REF CURSOR RETURN FileResult;
94
95
96
97
  TYPE DiskCopyResult IS RECORD (
    dcId INTEGER,
    retCode INTEGER);
  TYPE DiskCopyResult_Cur IS REF CURSOR RETURN DiskCopyResult;
98
99
100
101
102
103
104
  TYPE LogEntry IS RECORD (
    timeinfo NUMBER,
    uuid VARCHAR2(2048),
    priority INTEGER,
    msg VARCHAR2(2048),
    fileId NUMBER,
    nsHost VARCHAR2(2048),
105
    source VARCHAR2(2048),
106
107
    params VARCHAR2(2048));
  TYPE LogEntry_Cur IS REF CURSOR RETURN LogEntry;
108
END castor;
Dennis Waldron's avatar
Dennis Waldron committed
109
/
110

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/* Used to create a row in FileSystemsToCheck
   whenever a new FileSystem is created */
CREATE OR REPLACE TRIGGER tr_FileSystem_Insert
BEFORE INSERT ON FileSystem
FOR EACH ROW
BEGIN
  INSERT INTO FileSystemsToCheck (FileSystem, ToBeChecked) VALUES (:new.id, 0);
END;
/

/* Used to delete rows in FileSystemsToCheck
   whenever a FileSystem is deleted */
CREATE OR REPLACE TRIGGER tr_FileSystem_Delete
BEFORE DELETE ON FileSystem
FOR EACH ROW
BEGIN
  DELETE FROM FileSystemsToCheck WHERE FileSystem = :old.id;
END;
/
Giulia Taurelli's avatar
Giulia Taurelli committed
130

131
132
133
134
/* Checks consistency of DiskCopies when a FileSystem comes
 * back in production after a period spent in a DRAINING or a
 * DISABLED status.
 * Current checks/fixes include :
135
 *   - Canceling recalls for files that are VALID
136
137
138
139
140
141
142
143
144
145
146
147
148
149
 *     on the fileSystem that comes back. (Scheduled for bulk
 *     operation)
 *   - Dealing with files that are STAGEOUT on the fileSystem
 *     coming back but already exist on another one
 */
CREATE OR REPLACE PROCEDURE checkFSBackInProd(fsId NUMBER) AS
BEGIN
  -- Flag the filesystem for processing in a bulk operation later.
  -- We need to do this because some operations are database intensive
  -- and therefore it is often better to process several filesystems
  -- simultaneous with one query as opposed to one by one. Especially
  -- where full table scans are involved.
  UPDATE FileSystemsToCheck SET toBeChecked = 1
   WHERE fileSystem = fsId;
150
  -- Look for files that are STAGEOUT on the filesystem coming back to life
151
  -- but already VALID/WAITFS/STAGEOUT/
152
153
154
155
156
157
158
159
  -- WAITFS_SCHEDULING somewhere else
  FOR cf IN (SELECT /*+ USE_NL(D E) INDEX(D I_DiskCopy_Status6) */
                    UNIQUE D.castorfile, D.id dcId
               FROM DiskCopy D, DiskCopy E
              WHERE D.castorfile = E.castorfile
                AND D.fileSystem = fsId
                AND E.fileSystem != fsId
                AND decode(D.status,6,D.status,NULL) = dconst.DISKCOPY_STAGEOUT
160
                AND E.status IN (dconst.DISKCOPY_VALID,
161
162
                                 dconst.DISKCOPY_WAITFS, dconst.DISKCOPY_STAGEOUT,
                                 dconst.DISKCOPY_WAITFS_SCHEDULING)) LOOP
163
    -- Invalidate the DiskCopy
164
165
166
    UPDATE DiskCopy
       SET status = dconst.DISKCOPY_INVALID
     WHERE id = cf.dcId;
167
  END LOOP;
168
END;
Giulia Taurelli's avatar
Giulia Taurelli committed
169
170
/

171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/* PL/SQL method implementing bulkCheckFSBackInProd for processing
 * filesystems in one bulk operation to optimise database performance
 */
CREATE OR REPLACE PROCEDURE bulkCheckFSBackInProd AS
  fsIds "numList";
BEGIN
  -- Extract a list of filesystems which have been scheduled to be
  -- checked in a bulk operation on the database.
  UPDATE FileSystemsToCheck SET toBeChecked = 0
   WHERE toBeChecked = 1
  RETURNING fileSystem BULK COLLECT INTO fsIds;
  -- Nothing found, return
  IF fsIds.COUNT = 0 THEN
    RETURN;
  END IF;
186
  -- Look for recalls concerning files that are VALID
187
188
  -- on all filesystems scheduled to be checked, and restart their
  -- subrequests (reconsidering the recall source).
189
190
191
192
  FOR file IN (SELECT UNIQUE DiskCopy.castorFile
               FROM DiskCopy, RecallJob
              WHERE DiskCopy.castorfile = RecallJob.castorfile
                AND DiskCopy.fileSystem IN
193
                  (SELECT /*+ CARDINALITY(fsidTable 5) */ *
194
                     FROM TABLE(fsIds) fsidTable)
195
                AND DiskCopy.status = dconst.DISKCOPY_VALID) LOOP
196
197
198
199
200
201
202
203
204
    -- cancel the recall for that file
    deleteRecallJobs(file.castorFile);
    -- restart subrequests that were waiting on the recall
    UPDATE SubRequest
       SET status = dconst.SUBREQUEST_RESTART
     WHERE castorFile = file.castorFile
       AND status = dconst.SUBREQUEST_WAITTAPERECALL;
    -- commit that file
    COMMIT;
205
206
  END LOOP;
END;
Dennis Waldron's avatar
Dennis Waldron committed
207
/
208
209


210
/* SQL statement for the update trigger on the FileSystem table */
211
CREATE OR REPLACE TRIGGER tr_FileSystem_Update
Dennis Waldron's avatar
Dennis Waldron committed
212
BEFORE UPDATE OF status ON FileSystem
213
FOR EACH ROW WHEN (old.status != new.status)
214
BEGIN
215
216
  -- If the filesystem is coming back into PRODUCTION, initiate a consistency
  -- check for the diskcopies which reside on the filesystem.
217
218
  IF :old.status != dconst.FILESYSTEM_PRODUCTION AND
     :new.status = dconst.FILESYSTEM_PRODUCTION THEN
219
220
    checkFsBackInProd(:old.id);
  END IF;
221
END;
Dennis Waldron's avatar
Dennis Waldron committed
222
/
223

224

225
/* SQL statement for the update trigger on the DiskServer table */
226
CREATE OR REPLACE TRIGGER tr_DiskServer_Update
Dennis Waldron's avatar
Dennis Waldron committed
227
BEFORE UPDATE OF status ON DiskServer
228
FOR EACH ROW WHEN (old.status != new.status)
229
BEGIN
230
231
  -- If the diskserver is coming back into PRODUCTION, initiate a consistency
  -- check for all the diskcopies on its associated filesystems which are in
232
233
234
  -- PRODUCTION.
  IF :old.status != dconst.DISKSERVER_PRODUCTION AND
     :new.status = dconst.DISKSERVER_PRODUCTION AND :new.hwOnline = 1 THEN
235
236
    FOR fs IN (SELECT id FROM FileSystem
                WHERE diskServer = :old.id
237
                  AND status = dconst.FILESYSTEM_PRODUCTION)
238
239
240
241
    LOOP
      checkFsBackInProd(fs.id);
    END LOOP;
  END IF;
242
END;
Dennis Waldron's avatar
Dennis Waldron committed
243
/
244

Sebastien Ponce's avatar
Sebastien Ponce committed
245

246
/* Trigger used to check if the maxReplicaNb has been exceeded
247
 * after a diskcopy has changed its status to VALID
248
249
250
251
252
 */
CREATE OR REPLACE TRIGGER tr_DiskCopy_Stmt_Online
AFTER UPDATE OF STATUS ON DISKCOPY
DECLARE
  maxReplicaNb NUMBER;
253
254
  unused NUMBER;
  nbFiles NUMBER;
255
256
257
258
BEGIN
  -- Loop over the diskcopies to be processed
  FOR a IN (SELECT * FROM TooManyReplicasHelper)
  LOOP
259
260
261
262
263
264
265
266
    -- Lock the castorfile. This shouldn't be necessary as the procedure that
    -- caused the trigger to be executed should already have the lock.
    -- Nevertheless, we make sure!
    SELECT id INTO unused FROM CastorFile
     WHERE id = a.castorfile FOR UPDATE;
    -- Get the max replica number of the service class
    SELECT maxReplicaNb INTO maxReplicaNb
      FROM SvcClass WHERE id = a.svcclass;
267
268
269
270
    -- Produce a list of diskcopies to invalidate should too many replicas be
    -- online.
    FOR b IN (SELECT id FROM (
                SELECT rownum ind, id FROM (
271
                  SELECT /*+ INDEX (DiskCopy I_DiskCopy_Castorfile) */ DiskCopy.id
272
273
274
275
276
277
278
                    FROM DiskCopy, FileSystem, DiskPool2SvcClass, SvcClass,
                         DiskServer
                   WHERE DiskCopy.filesystem = FileSystem.id
                     AND FileSystem.diskpool = DiskPool2SvcClass.parent
                     AND FileSystem.diskserver = DiskServer.id
                     AND DiskPool2SvcClass.child = SvcClass.id
                     AND DiskCopy.castorfile = a.castorfile
279
                     AND DiskCopy.status = dconst.DISKCOPY_VALID
280
                     AND SvcClass.id = a.svcclass
281
                   -- Select non-PRODUCTION hardware first
282
283
284
                   ORDER BY decode(FileSystem.status, 0,
                            decode(DiskServer.status, 0, 0, 1), 1) ASC,
                            DiskCopy.gcWeight DESC))
285
               WHERE ind > maxReplicaNb)
286
    LOOP
287
      -- Sanity check, make sure that the last copy is never dropped!
288
      SELECT /*+ INDEX(DiskCopy I_DiskCopy_CastorFile) */ count(*) INTO nbFiles
289
290
291
292
293
294
        FROM DiskCopy, FileSystem, DiskPool2SvcClass, SvcClass, DiskServer
       WHERE DiskCopy.filesystem = FileSystem.id
         AND FileSystem.diskpool = DiskPool2SvcClass.parent
         AND FileSystem.diskserver = DiskServer.id
         AND DiskPool2SvcClass.child = SvcClass.id
         AND DiskCopy.castorfile = a.castorfile
295
         AND DiskCopy.status = dconst.DISKCOPY_VALID
296
297
298
299
         AND SvcClass.id = a.svcclass;
      IF nbFiles = 1 THEN
        EXIT;  -- Last file, so exit the loop
      END IF;
300
301
      -- Invalidate the diskcopy
      UPDATE DiskCopy
302
303
         SET status = dconst.DISKCOPY_INVALID,
             gcType = dconst.GCTYPE_TOOMANYREPLICAS
304
305
306
       WHERE id = b.id;
    END LOOP;
  END LOOP;
307
308
309
  -- cleanup the table so that we do not accumulate lines. This would trigger
  -- a n^2 behavior until the next commit.
  DELETE FROM TooManyReplicasHelper;
310
END;
Dennis Waldron's avatar
Dennis Waldron committed
311
/
312
313


314
/* Trigger used to provide input to the statement level trigger
315
 * defined above
316
 */
317
318
CREATE OR REPLACE TRIGGER tr_DiskCopy_Created
AFTER INSERT ON DiskCopy
Sebastien Ponce's avatar
Sebastien Ponce committed
319
FOR EACH ROW
320
WHEN (new.status = 0) -- dconst.DISKCOPY_VALID
321
322
323
324
325
326
DECLARE
  svcId  NUMBER;
  unused NUMBER;
  -- Trap `ORA-00001: unique constraint violated` errors
  CONSTRAINT_VIOLATED EXCEPTION;
  PRAGMA EXCEPTION_INIT(CONSTRAINT_VIOLATED, -00001);
Sebastien Ponce's avatar
Sebastien Ponce committed
327
BEGIN
328
329
330
331
332
  -- Insert the information about the diskcopy being processed into
  -- the TooManyReplicasHelper. This information will be used later
  -- on the DiskCopy AFTER UPDATE statement level trigger. We cannot
  -- do the work of that trigger here as it would result in
  -- `ORA-04091: table is mutating, trigger/function` errors
333
334
335
336
337
338
339
340
341
342
343
344
345
  BEGIN
    SELECT SvcClass.id INTO svcId
      FROM FileSystem, DiskPool2SvcClass, SvcClass
     WHERE FileSystem.diskpool = DiskPool2SvcClass.parent
       AND DiskPool2SvcClass.child = SvcClass.id
       AND FileSystem.id = :new.filesystem;
  EXCEPTION WHEN TOO_MANY_ROWS THEN
    -- The filesystem belongs to multiple service classes which is not
    -- supported by the replica management trigger.
    RETURN;
  END;
  -- Insert an entry into the TooManyReplicasHelper table.
  BEGIN
346
    INSERT INTO TooManyReplicasHelper (svcClass, castorFile)
347
348
349
350
    VALUES (svcId, :new.castorfile);
  EXCEPTION WHEN CONSTRAINT_VIOLATED THEN
    RETURN;  -- Entry already exists!
  END;
Sebastien Ponce's avatar
Sebastien Ponce committed
351
END;
Dennis Waldron's avatar
Dennis Waldron committed
352
/
Sebastien Ponce's avatar
Sebastien Ponce committed
353

354

355
/* PL/SQL method to get the next SubRequest to do according to the given service */
356
CREATE OR REPLACE PROCEDURE subRequestToDo(service IN VARCHAR2,
357
                                           srId OUT INTEGER, srRetryCounter OUT INTEGER, srFileName OUT VARCHAR2,
358
359
360
361
362
363
364
365
366
                                           srProtocol OUT VARCHAR2, srXsize OUT INTEGER,
                                           srModeBits OUT INTEGER, srFlags OUT INTEGER,
                                           srSubReqId OUT VARCHAR2, srAnswered OUT INTEGER, srReqType OUT INTEGER,
                                           rId OUT INTEGER, rFlags OUT INTEGER, rUsername OUT VARCHAR2, rEuid OUT INTEGER,
                                           rEgid OUT INTEGER, rMask OUT INTEGER, rPid OUT INTEGER, rMachine OUT VARCHAR2,
                                           rSvcClassName OUT VARCHAR2, rUserTag OUT VARCHAR2, rReqId OUT VARCHAR2,
                                           rCreationTime OUT INTEGER, rLastModificationTime OUT INTEGER,
                                           rRepackVid OUT VARCHAR2, rGCWeight OUT INTEGER,
                                           clIpAddress OUT INTEGER, clPort OUT INTEGER, clVersion OUT INTEGER) AS
367
368
369
370
371
  CURSOR SRcur IS
    SELECT /*+ FIRST_ROWS_10 INDEX(SR I_SubRequest_RT_CT_ID) */ SR.id
      FROM SubRequest PARTITION (P_STATUS_0_1_2) SR  -- START, RESTART, RETRY
     WHERE SR.svcHandler = service
     ORDER BY SR.creationTime ASC;
372
373
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
374
375
376
  varSrId NUMBER;
  varRName VARCHAR2(100);
  varClientId NUMBER;
377
BEGIN
378
379
380
381
  OPEN SRcur;
  -- Loop on candidates until we can lock one
  LOOP
    -- Fetch next candidate
382
    FETCH SRcur INTO varSrId;
383
384
385
386
    IF SRcur%NOTFOUND THEN
      -- No candidate, just return
      RETURN;
    END IF;
387
388
    BEGIN
      -- Try to take a lock on the current candidate, and revalidate its status
389
      SELECT /*+ INDEX(SR PK_SubRequest_ID) */ id INTO varSrId
390
        FROM SubRequest PARTITION (P_STATUS_0_1_2) SR
391
       WHERE id = varSrId FOR UPDATE NOWAIT;
392
      -- Since we are here, we got the lock. We have our winner, let's update it
393
394
      UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
         SET status = dconst.SUBREQUEST_WAITSCHED, subReqId = nvl(subReqId, uuidGen())
395
396
       WHERE id = varSrId
      RETURNING id, retryCounter, fileName, protocol, xsize, modeBits, flags, subReqId,
397
        answered, reqType, request, (SELECT object FROM Type2Obj WHERE type = reqType)
398
399
        INTO srId, srRetryCounter, srFileName, srProtocol, srXsize, srModeBits, srFlags, srSubReqId,
        srAnswered, srReqType, rId, varRName;
400
401
402
403
404
405
406
407
408
409
410
      EXIT;
    EXCEPTION
      WHEN NO_DATA_FOUND THEN
        -- Got to next candidate, this subrequest was processed already and its status changed
        NULL;
      WHEN SrLocked THEN
        -- Go to next candidate, this subrequest is being processed by another thread
        NULL;
    END;
  END LOOP;
  CLOSE SRcur;
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460

  BEGIN
    -- XXX This could be done in a single EXECUTE IMMEDIATE statement, but to make it
    -- XXX efficient we implement a CASE construct. At a later time the FileRequests should
    -- XXX be merged in a single table (partitioned by reqType) to avoid the following block.
    CASE
      WHEN varRName = 'StagePrepareToPutRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StagePrepareToPutRequest WHERE id = rId;
      WHEN varRName = 'StagePrepareToGetRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StagePrepareToGetRequest WHERE id = rId;
      WHEN varRName = 'StagePrepareToUpdateRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StagePrepareToUpdateRequest WHERE id = rId;
      WHEN varRName = 'StageRepackRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, repackVid, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, rRepackVid, varClientId
          FROM StageRepackRequest WHERE id = rId;
      WHEN varRName = 'StagePutRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StagePutRequest WHERE id = rId;
      WHEN varRName = 'StageGetRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StageGetRequest WHERE id = rId;
      WHEN varRName = 'StageUpdateRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StageUpdateRequest WHERE id = rId;
      WHEN varRName = 'StagePutDoneRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StagePutDoneRequest WHERE id = rId;
      WHEN varRName = 'StageRmRequest' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, varClientId
          FROM StageRmRequest WHERE id = rId;
      WHEN varRName = 'SetFileGCWeight' THEN
        SELECT flags, username, euid, egid, mask, pid, machine, svcClassName, userTag, reqId, creationTime, lastModificationTime, weight, client
          INTO rFlags, rUsername, rEuid, rEgid, rMask, rPid, rMachine, rSvcClassName, rUserTag, rReqId, rCreationTime, rLastModificationTime, rGcWeight, varClientId
          FROM SetFileGCWeight WHERE id = rId;
    END CASE;
    SELECT ipAddress, port, version
      INTO clIpAddress, clPort, clVersion
      FROM Client WHERE id = varClientId;
461
  EXCEPTION WHEN OTHERS THEN
462
    -- Something went really wrong, our subrequest does not have the corresponding request or client,
463
464
465
466
    -- Just drop it and re-raise exception. Some rare occurrences have happened in the past,
    -- this catch-all logic protects the stager-scheduling system from getting stuck with a single such case.
    archiveSubReq(varSrId, dconst.SUBREQUEST_FAILED_FINISHED);
    COMMIT;
467
468
    raise_application_error(-20100, 'Request got corrupted and could not be processed : ' ||
                                    SQLCODE || ' -ERROR- ' || SQLERRM);
469
  END;
470
END;
Dennis Waldron's avatar
Dennis Waldron committed
471
/
472

473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
/* PL/SQL method to fail a subrequest in WAITTAPERECALL
 * and eventually the recall itself if it's the only subrequest waiting for it
 */
CREATE OR REPLACE PROCEDURE failRecallSubReq(inSrId IN INTEGER, inCfId IN INTEGER) AS
  varNbSRs INTEGER;
BEGIN
  -- recall case. First fail the subrequest
  UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
     SET status = dconst.SUBREQUEST_FAILED
   WHERE id = inSrId;
  -- check whether there are other subRequests waiting for a recall
  SELECT COUNT(*) INTO varNbSrs
    FROM SubRequest
   WHERE castorFile = inCfId
     AND status = dconst.SUBREQUEST_WAITTAPERECALL;
  IF varNbSrs = 0 THEN
    -- no other subrequests, so drop recalls
    deleteRecallJobs(inCfId);
  END IF;
END;
/

495
496
497
498
499
500
/* PL/SQL method to process bulk abort on a given get/prepareToGet request */
CREATE OR REPLACE PROCEDURE processAbortForGet(sr processBulkAbortFileReqsHelper%ROWTYPE) AS
  abortedSRstatus NUMBER;
BEGIN
  -- note the revalidation of the status and even of the existence of the subrequest
  -- as it may have changed before we got the lock on the Castorfile in processBulkAbortFileReqs
501
502
503
504
  SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ status
    INTO abortedSRstatus
    FROM SubRequest
   WHERE id = sr.srId;
505
506
507
508
509
510
511
512
  CASE
    WHEN abortedSRstatus = dconst.SUBREQUEST_START
      OR abortedSRstatus = dconst.SUBREQUEST_RESTART
      OR abortedSRstatus = dconst.SUBREQUEST_RETRY
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSCHED
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSUBREQ
      OR abortedSRstatus = dconst.SUBREQUEST_READY
      OR abortedSRstatus = dconst.SUBREQUEST_REPACK
513
      OR abortedSRstatus = dconst.SUBREQUEST_READYFORSCHED THEN
514
      -- standard case, we only have to fail the subrequest
515
      UPDATE SubRequest SET status = dconst.SUBREQUEST_FAILED WHERE id = sr.srId;
516
517
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
      VALUES (sr.fileId, sr.nsHost, 0, '');
518
    WHEN abortedSRstatus = dconst.SUBREQUEST_WAITTAPERECALL THEN
519
        failRecallSubReq(sr.srId, sr.cfId);
520
521
        INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
        VALUES (sr.fileId, sr.nsHost, 0, '');
522
    WHEN abortedSRstatus = dconst.SUBREQUEST_FAILED
523
      OR abortedSRstatus = dconst.SUBREQUEST_FAILED_FINISHED THEN
524
      -- subrequest has failed, nothing to abort
525
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
526
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort failed subRequest');
527
528
529
    WHEN abortedSRstatus = dconst.SUBREQUEST_FINISHED
      OR abortedSRstatus = dconst.SUBREQUEST_ARCHIVED THEN
      -- subrequest is over, nothing to abort
530
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
531
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort completed subRequest');
532
533
    ELSE
      -- unknown status !
534
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
535
      VALUES (sr.fileId, sr.nsHost, serrno.SEINTERNAL, 'Found unknown status for request : ' || TO_CHAR(abortedSRstatus));
536
537
538
  END CASE;
EXCEPTION WHEN NO_DATA_FOUND THEN
  -- subRequest was deleted in the mean time !
539
  INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
540
  VALUES (sr.fileId, sr.nsHost, serrno.ENOENT, 'Targeted SubRequest has just been deleted');
541
542
543
544
545
546
547
548
549
END;
/

/* PL/SQL method to process bulk abort on a given put/prepareToPut request */
CREATE OR REPLACE PROCEDURE processAbortForPut(sr processBulkAbortFileReqsHelper%ROWTYPE) AS
  abortedSRstatus NUMBER;
BEGIN
  -- note the revalidation of the status and even of the existence of the subrequest
  -- as it may have changed before we got the lock on the Castorfile in processBulkAbortFileReqs
550
  SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ status INTO abortedSRstatus FROM SubRequest WHERE id = sr.srId;
551
552
553
554
555
556
557
558
  CASE
    WHEN abortedSRstatus = dconst.SUBREQUEST_START
      OR abortedSRstatus = dconst.SUBREQUEST_RESTART
      OR abortedSRstatus = dconst.SUBREQUEST_RETRY
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSCHED
      OR abortedSRstatus = dconst.SUBREQUEST_WAITSUBREQ
      OR abortedSRstatus = dconst.SUBREQUEST_READY
      OR abortedSRstatus = dconst.SUBREQUEST_REPACK
559
      OR abortedSRstatus = dconst.SUBREQUEST_READYFORSCHED THEN
560
      -- standard case, we only have to fail the subrequest
561
562
563
      UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
         SET status = dconst.SUBREQUEST_FAILED
       WHERE id = sr.srId;
564
565
566
567
      UPDATE DiskCopy
         SET status = decode(status, dconst.DISKCOPY_WAITFS, dconst.DISKCOPY_FAILED,
                                     dconst.DISKCOPY_WAITFS_SCHEDULING, dconst.DISKCOPY_FAILED,
                                     dconst.DISKCOPY_INVALID)
568
569
570
       WHERE castorfile = sr.cfid AND status IN (dconst.DISKCOPY_STAGEOUT,
                                                 dconst.DISKCOPY_WAITFS,
                                                 dconst.DISKCOPY_WAITFS_SCHEDULING);
571
572
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
      VALUES (sr.fileId, sr.nsHost, 0, '');
573
    WHEN abortedSRstatus = dconst.SUBREQUEST_FAILED
574
      OR abortedSRstatus = dconst.SUBREQUEST_FAILED_FINISHED THEN
575
      -- subrequest has failed, nothing to abort
576
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
577
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort failed subRequest');
578
579
580
    WHEN abortedSRstatus = dconst.SUBREQUEST_FINISHED
      OR abortedSRstatus = dconst.SUBREQUEST_ARCHIVED THEN
      -- subrequest is over, nothing to abort
581
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
582
      VALUES (sr.fileId, sr.nsHost, serrno.EINVAL, 'Cannot abort completed subRequest');
583
584
    ELSE
      -- unknown status !
585
      INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
586
      VALUES (sr.fileId, sr.nsHost, serrno.SEINTERNAL, 'Found unknown status for request : ' || TO_CHAR(abortedSRstatus));
587
588
589
  END CASE;
EXCEPTION WHEN NO_DATA_FOUND THEN
  -- subRequest was deleted in the mean time !
590
  INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
591
  VALUES (sr.fileId, sr.nsHost, serrno.ENOENT, 'Targeted SubRequest has just been deleted');
592
593
594
END;
/

595
/* PL/SQL method to process bulk abort on a given Repack request */
596
CREATE OR REPLACE PROCEDURE processBulkAbortForRepack(origReqId IN INTEGER) AS
597
  abortedSRstatus INTEGER := -1;
598
599
  srsToUpdate "numList";
  dcmigrsToUpdate "numList";
600
601
  nbItems INTEGER;
  nbItemsDone INTEGER := 0;
602
603
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
604
605
  cfId INTEGER;
  srId INTEGER;
606
607
  firstOne BOOLEAN := TRUE;
  commitWork BOOLEAN := FALSE;
608
  varOriginalVID VARCHAR2(2048);
609
BEGIN
610
611
  -- get the VID of the aborted repack request
  SELECT repackVID INTO varOriginalVID FROM StageRepackRequest WHERE id = origReqId;
612
  -- Gather the list of subrequests to abort
613
  INSERT INTO ProcessBulkAbortFileReqsHelper (srId, cfId, fileId, nsHost, uuid) (
614
615
616
617
618
    SELECT /*+ INDEX(Subrequest I_Subrequest_CastorFile)*/
           SubRequest.id, CastorFile.id, CastorFile.fileId, CastorFile.nsHost, SubRequest.subreqId
      FROM SubRequest, CastorFile
     WHERE SubRequest.castorFile = CastorFile.id
       AND request = origReqId
619
       AND status IN (dconst.SUBREQUEST_START, dconst.SUBREQUEST_RESTART, dconst.SUBREQUEST_RETRY,
620
621
622
623
624
625
626
627
628
629
                      dconst.SUBREQUEST_WAITSUBREQ, dconst.SUBREQUEST_WAITTAPERECALL,
                      dconst.SUBREQUEST_REPACK));
  SELECT COUNT(*) INTO nbItems FROM processBulkAbortFileReqsHelper;
  -- handle aborts in bulk while avoiding deadlocks
  WHILE nbItems > 0 LOOP
    FOR sr IN (SELECT srId, cfId, fileId, nsHost, uuid FROM processBulkAbortFileReqsHelper) LOOP
      BEGIN
        IF firstOne THEN
          -- on the first item, we take a blocking lock as we are sure that we will not
          -- deadlock and we would like to process at least one item to not loop endlessly
630
          SELECT id INTO cfId FROM CastorFile WHERE id = sr.cfId FOR UPDATE;
631
632
633
634
635
636
637
          firstOne := FALSE;
        ELSE
          -- on the other items, we go for a non blocking lock. If we get it, that's
          -- good and we process this extra subrequest within the same session. If
          -- we do not get the lock, then we close the session here and go for a new
          -- one. This will prevent dead locks while ensuring that a minimal number of
          -- commits is performed.
638
          SELECT id INTO cfId FROM CastorFile WHERE id = sr.cfId FOR UPDATE NOWAIT;
639
640
641
642
643
644
645
646
        END IF;
        -- note the revalidation of the status and even of the existence of the subrequest
        -- as it may have changed before we got the lock on the Castorfile in processBulkAbortFileReqs
        SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ status
          INTO abortedSRstatus
          FROM SubRequest
         WHERE id = sr.srId;
        CASE
647
648
          WHEN abortedSRstatus = dconst.SUBREQUEST_START
            OR abortedSRstatus = dconst.SUBREQUEST_RESTART
649
650
651
            OR abortedSRstatus = dconst.SUBREQUEST_RETRY
            OR abortedSRstatus = dconst.SUBREQUEST_WAITSUBREQ THEN
            -- easy case, we only have to fail the subrequest
652
            INSERT INTO ProcessRepackAbortHelperSR (srId) VALUES (sr.srId);
653
          WHEN abortedSRstatus = dconst.SUBREQUEST_WAITTAPERECALL THEN
654
655
            -- recall case, fail the subRequest and cancel the recall if needed
            failRecallSubReq(sr.srId, sr.cfId);
656
          WHEN abortedSRstatus = dconst.SUBREQUEST_REPACK THEN
657
            -- trigger the update the subrequest status to FAILED
658
            INSERT INTO ProcessRepackAbortHelperSR (srId) VALUES (sr.srId);
659
660
661
662
            -- delete migration jobs of this repack, hence stopping selectively the migrations
            DELETE FROM MigrationJob WHERE castorfile = sr.cfId AND originalVID = varOriginalVID;
            -- delete migrated segments if no migration jobs remain
            BEGIN
663
              SELECT id INTO cfId FROM MigrationJob WHERE castorfile = sr.cfId AND ROWNUM < 2;
664
665
            EXCEPTION WHEN NO_DATA_FOUND THEN
              DELETE FROM MigratedSegment WHERE castorfile = sr.cfId;
666
              -- trigger the update of the CastorFile's tapeStatus to ONTAPE as no more migrations remain
667
              INSERT INTO ProcessRepackAbortHelperDCmigr (cfId) VALUES (sr.cfId);
668
669
670
671
            END;
           WHEN abortedSRstatus IN (dconst.SUBREQUEST_FAILED,
                                    dconst.SUBREQUEST_FINISHED,
                                    dconst.SUBREQUEST_FAILED_FINISHED,
672
                                    dconst.SUBREQUEST_ARCHIVED) THEN
673
674
             -- nothing to be done here
             NULL;
675
676
677
678
679
680
681
682
683
684
685
686
687
688
        END CASE;
        DELETE FROM processBulkAbortFileReqsHelper WHERE srId = sr.srId;
        nbItemsDone := nbItemsDone + 1;
      EXCEPTION WHEN SrLocked THEN
        commitWork := TRUE;
      END;
      -- commit anyway from time to time, to avoid too long redo logs
      IF commitWork OR nbItemsDone >= 1000 THEN
        -- exit the current loop and restart a new one, in order to commit without getting invalid ROWID errors
        EXIT;
      END IF;
    END LOOP;
    -- do the bulk updates
    SELECT srId BULK COLLECT INTO srsToUpdate FROM ProcessRepackAbortHelperSR;
689
    FORALL i IN 1 .. srsToUpdate.COUNT
690
      UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
691
         SET diskCopy = NULL, lastModificationTime = getTime(),
692
693
             status = dconst.SUBREQUEST_FAILED_FINISHED,
             errorCode = 1701, errorMessage = 'Aborted explicitely'  -- ESTCLEARED
694
695
       WHERE id = srsToUpdate(i);
    SELECT cfId BULK COLLECT INTO dcmigrsToUpdate FROM ProcessRepackAbortHelperDCmigr;
696
    FORALL i IN 1 .. dcmigrsToUpdate.COUNT
697
      UPDATE CastorFile SET tapeStatus = dconst.CASTORFILE_ONTAPE WHERE id = dcmigrsToUpdate(i);
698
699
700
701
702
703
704
705
    -- commit
    COMMIT;
    -- reset all counters
    nbItems := nbItems - nbItemsDone;
    nbItemsDone := 0;
    firstOne := TRUE;
    commitWork := FALSE;
  END LOOP;
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
  -- archive the request
  BEGIN
    SELECT id, status INTO srId, abortedSRstatus
      FROM SubRequest
     WHERE request = origReqId
       AND status IN (dconst.SUBREQUEST_FINISHED, dconst.SUBREQUEST_FAILED_FINISHED)
       AND ROWNUM = 1;
    -- This procedure should really be called 'terminateSubReqAndArchiveRequest', and this is
    -- why we call it here: we need to trigger the logic to mark the whole request and all of its subrequests
    -- as ARCHIVED, so that they are cleaned up afterwards. Note that this is effectively
    -- a no-op for the status change of the single fetched SubRequest.
    archiveSubReq(srId, abortedSRstatus);
  EXCEPTION WHEN NO_DATA_FOUND THEN
    -- Should never happen, anyway ignore as there's nothing else to do
    NULL;
  END;
722
  COMMIT;
723
724
725
END;
/

726
727
/* PL/SQL method to process bulk abort on files related requests */
CREATE OR REPLACE PROCEDURE processBulkAbortFileReqs
728
(origReqId IN INTEGER, fileIds IN "numList", nsHosts IN strListTable, reqType IN NUMBER) AS
729
  nbItems NUMBER;
730
  nbItemsDone NUMBER := 0;
731
732
733
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
  unused NUMBER;
734
735
  firstOne BOOLEAN := TRUE;
  commitWork BOOLEAN := FALSE;
736
737
738
739
BEGIN
  -- Gather the list of subrequests to abort
  IF fileIds.count() = 0 THEN
    -- handle the case of an empty request, meaning that all files should be aborted
740
    INSERT INTO ProcessBulkAbortFileReqsHelper (srId, cfId, fileId, nsHost, uuid) (
741
      SELECT /*+ INDEX(Subrequest I_Subrequest_Request)*/
742
             SubRequest.id, CastorFile.id, CastorFile.fileId, CastorFile.nsHost, SubRequest.subreqId
743
744
        FROM SubRequest, CastorFile
       WHERE SubRequest.castorFile = CastorFile.id
745
         AND request = origReqId);
746
747
748
749
750
751
  ELSE
    -- handle the case of selective abort
    FOR i IN fileIds.FIRST .. fileIds.LAST LOOP
      DECLARE
        srId NUMBER;
        cfId NUMBER;
752
        srUuid VARCHAR(2048);
753
      BEGIN
754
755
        SELECT /*+ INDEX(Subrequest I_Subrequest_CastorFile)*/
               SubRequest.id, CastorFile.id, SubRequest.subreqId INTO srId, cfId, srUuid
756
757
758
759
760
          FROM SubRequest, CastorFile
         WHERE request = origReqId
           AND SubRequest.castorFile = CastorFile.id
           AND CastorFile.fileid = fileIds(i)
           AND CastorFile.nsHost = nsHosts(i);
761
762
        INSERT INTO processBulkAbortFileReqsHelper (srId, cfId, fileId, nsHost, uuid)
        VALUES (srId, cfId, fileIds(i), nsHosts(i), srUuid);
763
764
      EXCEPTION WHEN NO_DATA_FOUND THEN
        -- this fileid/nshost did not exist in the request, send an error back
765
        INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
766
        VALUES (fileIds(i), nsHosts(i), serrno.ENOENT, 'No subRequest found for this fileId/nsHost');
767
768
769
770
771
772
      END;
    END LOOP;
  END IF;
  SELECT COUNT(*) INTO nbItems FROM processBulkAbortFileReqsHelper;
  -- handle aborts in bulk while avoiding deadlocks
  WHILE nbItems > 0 LOOP
773
    FOR sr IN (SELECT srId, cfId, fileId, nsHost, uuid FROM processBulkAbortFileReqsHelper) LOOP
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
      BEGIN
        IF firstOne THEN
          -- on the first item, we take a blocking lock as we are sure that we will not
          -- deadlock and we would like to process at least one item to not loop endlessly
          SELECT id INTO unused FROM CastorFile WHERE id = sr.cfId FOR UPDATE;
          firstOne := FALSE;
        ELSE
          -- on the other items, we go for a non blocking lock. If we get it, that's
          -- good and we process this extra subrequest within the same session. If
          -- we do not get the lock, then we close the session here and go for a new
          -- one. This will prevent dead locks while ensuring that a minimal number of
          -- commits is performed.
          SELECT id INTO unused FROM CastorFile WHERE id = sr.cfId FOR UPDATE NOWAIT;
        END IF;
        -- we got the lock on the Castorfile, we can handle the abort for this subrequest
        CASE reqType
          WHEN 1 THEN processAbortForGet(sr);
          WHEN 2 THEN processAbortForPut(sr);
        END CASE;
        DELETE FROM processBulkAbortFileReqsHelper WHERE srId = sr.srId;
794
        -- make the scheduler aware so that it can remove the transfer from the queues if needed
795
        INSERT INTO TransfersToAbort (uuid) VALUES (sr.uuid);
796
        nbItemsDone := nbItemsDone + 1;
797
      EXCEPTION WHEN SrLocked THEN
798
        commitWork := TRUE;
799
      END;
800
801
802
803
804
      -- commit anyway from time to time, to avoid too long redo logs
      IF commitWork OR nbItemsDone >= 1000 THEN
        -- exit the current loop and restart a new one, in order to commit without getting invalid ROWID errors
        EXIT;
      END IF;
805
    END LOOP;
806
807
808
809
810
811
812
813
814
    -- commit
    COMMIT;
    -- wake up the scheduler so that it can remove the transfer from the queues
    DBMS_ALERT.SIGNAL('transfersToAbort', '');
    -- reset all counters
    nbItems := nbItems - nbItemsDone;
    nbItemsDone := 0;
    firstOne := TRUE;
    commitWork := FALSE;
815
816
817
818
819
820
821
822
823
824
825
826
827
828
  END LOOP;
END;
/

/* PL/SQL method to process bulk abort requests */
CREATE OR REPLACE PROCEDURE processBulkAbort(abortReqId IN INTEGER, rIpAddress OUT INTEGER,
                                             rport OUT INTEGER, rReqUuid OUT VARCHAR2) AS
  clientId NUMBER;
  reqType NUMBER;
  requestId NUMBER;
  abortedReqUuid VARCHAR(2048);
  fileIds "numList";
  nsHosts strListTable;
  ids "numList";
829
  nsHostName VARCHAR2(2048);
830
BEGIN
831
832
  -- get the stager/nsHost configuration option
  nsHostName := getConfigOption('stager', 'nsHost', '');
833
834
835
836
837
  -- get request and client informations and drop them from the DB
  DELETE FROM StageAbortRequest WHERE id = abortReqId
    RETURNING reqId, parentUuid, client INTO rReqUuid, abortedReqUuid, clientId;
  DELETE FROM Client WHERE id = clientId
    RETURNING ipAddress, port INTO rIpAddress, rport;
838
839
840
841
  -- list fileids to process and drop them from the DB; override the
  -- nsHost in case it is defined in the configuration
  SELECT fileid, decode(nsHostName, '', nsHost, nsHostName), id
    BULK COLLECT INTO fileIds, nsHosts, ids
842
    FROM NsFileId WHERE request = abortReqId;
843
  FORALL i IN 1 .. ids.COUNT DELETE FROM NsFileId WHERE id = ids(i);
844
845
846
  -- dispatch actual processing depending on request type
  BEGIN
    SELECT rType, id INTO reqType, requestId FROM
847
848
849
850
851
852
853
      (SELECT /*+ INDEX(StageGetRequest I_StageGetRequest_ReqId) */
              reqId, id, 1 as rtype from StageGetRequest UNION ALL
       SELECT /*+ INDEX(StagePrepareToGetRequest I_StagePTGRequest_ReqId) */
              reqId, id, 1 as rtype from StagePrepareToGetRequest UNION ALL
       SELECT /*+ INDEX(stagePutRequest I_stagePutRequest_ReqId) */
              reqId, id, 2 as rtype from StagePutRequest UNION ALL
       SELECT /*+ INDEX(StagePrepareToPutRequest I_StagePTPRequest_ReqId) */
854
855
856
              reqId, id, 2 as rtype from StagePrepareToPutRequest UNION ALL
       SELECT /*+ INDEX(StageRepackRequest I_RepackRequest_ReqId) */
              reqId, id, 3 as rtype from StageRepackRequest)
857
858
859
     WHERE reqId = abortedReqUuid;
  EXCEPTION WHEN NO_DATA_FOUND THEN
    -- abort on non supported request type
860
    INSERT INTO ProcessBulkRequestHelper (fileId, nsHost, errorCode, errorMessage)
861
    VALUES (0, '', serrno.ENOENT, 'Request not found, or abort not supported for this request type');
862
863
    RETURN;
  END;
864
  IF reqType IN (1,2) THEN
865
    processBulkAbortFileReqs(requestId, fileIds, nsHosts, reqType);
866
  ELSE
867
    processBulkAbortForRepack(requestId);
868
869
870
871
  END IF;
END;
/

872
/* PL/SQL method to process bulk requests */
873
CREATE OR REPLACE PROCEDURE processBulkRequest(service IN VARCHAR2, requestId OUT INTEGER,
874
875
                                               rtype OUT INTEGER, rIpAddress OUT INTEGER,
                                               rport OUT INTEGER, rReqUuid OUT VARCHAR2,
876
877
                                               reuid OUT INTEGER, regid OUT INTEGER,
                                               freeParam OUT VARCHAR2,
878
879
880
881
882
883
884
885
886
887
888
                                               rSubResults OUT castor.FileResult_Cur) AS
  CURSOR Rcur IS SELECT /*+ FIRST_ROWS(10) */ id
                   FROM NewRequests
                  WHERE type IN (
                    SELECT type FROM Type2Obj
                     WHERE svcHandler = service
                       AND svcHandler IS NOT NULL);
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
BEGIN
  -- in case we do not find anything, rtype should be 0
889
  rType := 0;
890
891
892
893
  OPEN Rcur;
  -- Loop on candidates until we can lock one
  LOOP
    -- Fetch next candidate
894
    FETCH Rcur INTO requestId;
895
896
897
    EXIT WHEN Rcur%NOTFOUND;
    BEGIN
      -- Try to take a lock on the current candidate
898
      SELECT type INTO rType FROM NewRequests WHERE id = requestId FOR UPDATE NOWAIT;
899
      -- Since we are here, we got the lock. We have our winner,
900
      DELETE FROM NewRequests WHERE id = requestId;
901
902
903
904
905
      -- Clear the temporary table for subresults
      DELETE FROM ProcessBulkRequestHelper;
      -- dispatch actual processing depending on request type
      CASE rType
        WHEN 50 THEN -- Abort Request
906
907
908
          processBulkAbort(requestId, rIpAddress, rport, rReqUuid);
          reuid := -1;  -- not used
          regid := -1;  -- not used
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
      END CASE;
      -- open cursor on results
      OPEN rSubResults FOR
        SELECT fileId, nsHost, errorCode, errorMessage FROM ProcessBulkRequestHelper;
      -- and exit the loop
      EXIT;
    EXCEPTION
      WHEN NO_DATA_FOUND THEN
        -- Got to next candidate, this request was processed already and disappeared
        NULL;
      WHEN SrLocked THEN
        -- Go to next candidate, this request is being processed by another thread
        NULL;
    END;
  END LOOP;
  CLOSE Rcur;
END;
/

928
929
/* PL/SQL method to get the next failed SubRequest to do according to the given service */
/* the service parameter is not used now, it will with the new stager */
930
931
932
933
CREATE OR REPLACE PROCEDURE subRequestFailedToDo(srId OUT NUMBER, srFileName OUT VARCHAR2, srSubReqId OUT VARCHAR2,
                                                 srErrorCode OUT INTEGER, srErrorMessage OUT VARCHAR2, rReqId OUT VARCHAR2,
                                                 clIpAddress OUT INTEGER, clPort OUT INTEGER, clVersion OUT INTEGER,
                                                 srFileId OUT NUMBER) AS
934
935
  SrLocked EXCEPTION;
  PRAGMA EXCEPTION_INIT (SrLocked, -54);
936
937
938
  CURSOR c IS
     SELECT /*+ FIRST_ROWS(10) INDEX(SR I_SubRequest_RT_CT_ID) */ SR.id
       FROM SubRequest PARTITION (P_STATUS_7) SR; -- FAILED
939
940
941
942
943
944
  varSRId NUMBER;
  varCFId NUMBER;
  varRId NUMBER;
  varSrAnswered INTEGER;
  varRName VARCHAR2(100);
  varClientId NUMBER;
945
946
BEGIN
  OPEN c;
947
  LOOP
948
    FETCH c INTO varSRId;
949
950
    EXIT WHEN c%NOTFOUND;
    BEGIN
951
      SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ answered INTO varSrAnswered
952
        FROM SubRequest PARTITION (P_STATUS_7)
953
954
       WHERE id = varSRId FOR UPDATE NOWAIT;
      IF varSrAnswered = 1 THEN
955
        -- already answered, archive and move on
956
        archiveSubReq(varSRId, dconst.SUBREQUEST_FAILED_FINISHED);
957
958
        -- release the lock on this request as it's completed
        COMMIT;
959
      ELSE
960
961
        -- we got our subrequest, select all relevant data and hold the lock
        SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ fileName, subReqId, errorCode, errorMessage,
962
          (SELECT object FROM Type2Obj WHERE type = reqType), request, castorFile
963
964
965
          INTO srFileName, srSubReqId, srErrorCode, srErrorMessage, varRName, varRId, varCFId
          FROM SubRequest
         WHERE id = varSRId;
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
        srId := varSRId;
        srFileId := 0;
        BEGIN
          CASE
            WHEN varRName = 'StagePrepareToPutRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
                FROM StagePrepareToPutRequest WHERE id = varRId;
            WHEN varRName = 'StagePrepareToGetRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
                FROM StagePrepareToGetRequest WHERE id = varRId;
            WHEN varRName = 'StagePrepareToUpdateRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
                FROM StagePrepareToUpdateRequest WHERE id = varRId;
982
983
984
985
            WHEN varRName = 'StageRepackRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
                FROM StageRepackRequest WHERE id = varRId;
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
            WHEN varRName = 'StagePutRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
                FROM StagePutRequest WHERE id = varRId;
            WHEN varRName = 'StageGetRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
                FROM StageGetRequest WHERE id = varRId;
            WHEN varRName = 'StageUpdateRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
                FROM StageUpdateRequest WHERE id = varRId;
            WHEN varRName = 'StagePutDoneRequest' THEN
              SELECT reqId, client
                INTO rReqId, varClientId
For faster browsing, not all history is shown. View entire blame