Commit 954b7813 authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Introduced ORACLE alerting mechanism in the stager

parent 66e70d12
......@@ -132,6 +132,10 @@ castor::db::ora::OraCommonSvc::requestToDo(std::string service)
m_requestToDoStatement->registerOutParam
(3, oracle::occi::OCCIINT);
m_requestToDoStatement->setAutoCommit(true);
// also register for associated alert
oracle::occi::Statement* registerAlertStmt =
createStatement("BEGIN DBMS_ALERT.REGISTER('wakeUp" + service + "'); END;");
registerAlertStmt->execute();
}
// execute the statement
m_requestToDoStatement->setString(1, service);
......
......@@ -354,6 +354,10 @@ castor::db::ora::OraStagerSvc::subRequestToDo
m_subRequestToDoStatement->registerOutParam
(29, oracle::occi::OCCIINT);
m_subRequestToDoStatement->setAutoCommit(true);
// also register for associated alert
oracle::occi::Statement* registerAlertStmt =
createStatement("BEGIN DBMS_ALERT.REGISTER('wakeUp" + service + "'); END;");
registerAlertStmt->execute();
}
m_subRequestToDoStatement->setString(1, service);
......@@ -488,6 +492,10 @@ castor::db::ora::OraStagerSvc::processBulkRequest
m_processBulkRequestStatement->registerOutParam
(10, oracle::occi::OCCICURSOR);
m_processBulkRequestStatement->setAutoCommit(true);
// also register for associated alert
oracle::occi::Statement* registerAlertStmt =
createStatement("BEGIN DBMS_ALERT.REGISTER('wakeUp" + service + "'); END;");
registerAlertStmt->execute();
}
m_processBulkRequestStatement->setString(1, service);
......@@ -569,6 +577,10 @@ castor::db::ora::OraStagerSvc::subRequestFailedToDo()
(9, oracle::occi::OCCIINT);
m_subRequestFailedToDoStatement->registerOutParam
(10, oracle::occi::OCCIDOUBLE);
// also register for associated alert
oracle::occi::Statement* registerAlertStmt =
createStatement("BEGIN DBMS_ALERT.REGISTER('wakeUpErrorSvc'); END;");
registerAlertStmt->execute();
}
// execute the statement and see whether we found something
unsigned int rc =
......@@ -1180,6 +1192,7 @@ castor::db::ora::OraStagerSvc::handlePrepareToGet(u_signed64 cfId,
m_handlePrepareToGetStatement = createStatement
("BEGIN :1 := handlePrepareToGet(:2, :3, :4, :5, :6); END;");
m_handlePrepareToGetStatement->registerOutParam(1, oracle::occi::OCCIINT);
m_handlePrepareToGetStatement->setAutoCommit(true);
}
// Execute the statement
m_handlePrepareToGetStatement->setDouble(2, cfId);
......@@ -1250,6 +1263,7 @@ bool castor::db::ora::OraStagerSvc::handlePrepareToPut(u_signed64 cfId,
m_handlePrepareToPutStatement = createStatement
("BEGIN :1 := handlePrepareToPut(:2, :3, :4, :5); END;");
m_handlePrepareToPutStatement->registerOutParam(1, oracle::occi::OCCIINT);
m_handlePrepareToPutStatement->setAutoCommit(true);
}
// Execute the statement
m_handlePrepareToPutStatement->setDouble(2, cfId);
......
......@@ -32,7 +32,7 @@ BEGIN
-- dealing with the same copy.
SELECT status INTO stat FROM DiskCopy WHERE id = dcId;
-- First the invalid case
IF stat = 7 THEN -- INVALID
IF stat = dconst.DISKCOPY_INVALID THEN
raise_application_error(-20106, 'Could not update an invalid copy of a file (file has been modified by somebody else concurrently)');
END IF;
-- Then the disk only check
......@@ -82,7 +82,7 @@ BEGIN
-- If we are not having a STAGEOUT diskCopy, we are the only ones to write,
-- so we have to setup everything
-- invalidate all diskcopies
UPDATE DiskCopy SET status = 7 -- INVALID
UPDATE DiskCopy SET status = dconst.DISKCOPY_INVALID
WHERE castorFile = cfId
AND status IN (0, 10);
-- except the one we are dealing with that goes to STAGEOUT
......@@ -93,7 +93,7 @@ BEGIN
deleteMigrationJobs(cfId);
END IF;
-- Invalidate any ongoing replications
UPDATE DiskCopy SET status = 7 -- INVALID
UPDATE DiskCopy SET status = dconst.DISKCOPY_INVALID
WHERE castorFile = cfId
AND status = 1; -- WAITDISK2DISKCOPY
END;
......@@ -137,7 +137,7 @@ BEGIN
AND SubRequest.id = srId
AND SubRequest.request = Request.id;
-- Check that we did not cancel the SubRequest in the mean time
IF srStatus IN (7, 9) THEN -- FAILED, FAILED_FINISHED
IF srStatus IN (dconst.SUBREQUEST_FAILED, dconst.SUBREQUEST_FAILED_FINISHED) THEN
raise_application_error(-20104, 'SubRequest canceled while queuing in scheduler. Giving up.');
END IF;
-- Get selected filesystem
......@@ -276,7 +276,7 @@ EXCEPTION WHEN NO_DATA_FOUND THEN
END IF;
-- It was not an update creating a file, so we fail
UPDATE SubRequest
SET status = 7, errorCode = 1725, errorMessage='Request canceled while queuing'
SET status = dconst.SUBREQUEST_FAILED, errorCode = 1725, errorMessage='Request canceled while queuing'
WHERE id = srId;
COMMIT;
raise_application_error(-20114, 'File invalidated while queuing in the scheduler, please try again');
......@@ -694,7 +694,7 @@ CREATE OR REPLACE PROCEDURE getUpdateFailedProcExt
BEGIN
-- Fail the subrequest. The stager will try and answer the client
UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
SET status = 7, -- FAILED
SET status = dconst.SUBREQUEST_FAILED,
errorCode = errno,
errorMessage = errmsg
WHERE id = srId
......@@ -726,7 +726,7 @@ BEGIN
WHERE id = srId;
-- Fail the subRequest
UPDATE /*+ INDEX(Subrequest PK_Subrequest_Id)*/ SubRequest
SET status = 7, -- FAILED
SET status = dconst.SUBREQUEST_FAILED,
errorCode = errno,
errorMessage = errmsg
WHERE id = srId;
......@@ -931,6 +931,22 @@ BEGIN
END;
/
CREATE OR REPLACE TRIGGER tr_SubRequest_informError AFTER UPDATE OF status ON SubRequest
FOR EACH ROW WHEN (new.status = 7) -- SUBREQUEST_FAILED
BEGIN
DBMS_ALERT.SIGNAL('wakeUpErrorSvc', '');
END;
/
CREATE OR REPLACE TRIGGER tr_SubRequest_informRestart AFTER UPDATE OF status ON SubRequest
FOR EACH ROW WHEN (new.status = 1 OR -- SUBREQUEST_RESTART
new.status = 2 OR -- SUBREQUEST_RETRY
new.status = 0) -- SUBREQUEST_START
BEGIN
DBMS_ALERT.SIGNAL('wakeUp'||:new.svcHandler, '');
END;
/
CREATE OR REPLACE FUNCTION selectRandomDestinationFs(inSvcClassId IN INTEGER,
inMinFreeSpace IN INTEGER,
inCfId IN INTEGER)
......
......@@ -92,6 +92,8 @@ BEGIN
VALUES (clientIP,clientPort,clientVersion,clientSecure,clientId);
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpJobSvc', '');
END;
/
......@@ -180,6 +182,21 @@ BEGIN
-- insert the subrequest
INSERT INTO SubRequest (retryCounter, fileName, protocol, xsize, priority, subreqId, flags, modeBits, creationTime, lastModificationTime, answered, errorCode, errorMessage, requestedFileSystems, svcHandler, id, diskcopy, castorFile, status, request, getNextStatus, reqType)
VALUES (0, srFileNames(i), srProtocols(i), srXsizes(i), 0, NULL, srFlags(i), srModeBits(i), creationTime, creationTime, 0, 0, '', NULL, svcHandler, subreqId, NULL, NULL, dconst.SUBREQUEST_START, reqId, 0, inReqType);
-- send an alert to accelerate the processing of the request
CASE
WHEN inReqType = 35 OR -- StageGetRequest
inReqType = 40 OR -- StagePutRequest
inReqType = 44 THEN -- StageUpdateRequest
DBMS_ALERT.SIGNAL('wakeUpJobReqSvc', '');
WHEN inReqType = 36 OR -- StagePrepareToGetRequest
inReqType = 37 OR -- StagePrepareToPutRequest
inReqType = 38 THEN -- StagePrepareToUpdateRequest
DBMS_ALERT.SIGNAL('wakeUpPrepReqSvc', '');
WHEN inReqType = 42 OR -- StageRmRequest
inReqType = 39 OR -- StagePutDoneRequest
inReqType = 95 THEN -- SetFileGCWeight
DBMS_ALERT.SIGNAL('wakeUpStageReqSvc', '');
END CASE;
END LOOP;
END;
/
......@@ -233,6 +250,8 @@ BEGIN
VALUES (clientIP,clientPort,clientVersion,clientSecure,clientId);
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpJobSvc', '');
END;
/
......@@ -275,6 +294,8 @@ BEGIN
VALUES (clientIP,clientPort,clientVersion,clientSecure,clientId);
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpQueryReqSvc', '');
END;
/
......@@ -328,6 +349,8 @@ BEGIN
END LOOP;
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpQueryReqSvc', '');
END;
/
......@@ -372,6 +395,8 @@ BEGIN
VALUES (clientIP,clientPort,clientVersion,clientSecure,clientId);
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpQueryReqSvc', '');
END;
/
......@@ -420,6 +445,8 @@ BEGIN
VALUES (clientIP,clientPort,clientVersion,clientSecure,clientId);
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpJobSvc', '');
END;
/
......@@ -462,6 +489,8 @@ BEGIN
VALUES (clientIP,clientPort,clientVersion,clientSecure,clientId);
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpGCSvc', '');
END;
/
......@@ -506,6 +535,8 @@ BEGIN
VALUES (clientIP,clientPort,clientVersion,clientSecure,clientId);
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpQueryReqSvc', '');
END;
/
......@@ -561,6 +592,8 @@ BEGIN
END LOOP;
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpBulkStageReqSvc', '');
END;
/
......@@ -625,6 +658,8 @@ BEGIN
END LOOP;
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpGCSvc', '');
END;
/
......@@ -687,5 +722,7 @@ BEGIN
END LOOP;
-- insert a row into newRequests table to trigger the processing of the request
INSERT INTO newRequests (id, type, creation) VALUES (reqId, reqType, to_date('01011970','ddmmyyyy') + 1/24/60/60 * creationTime);
-- send an alert to accelerate the processing of the request
DBMS_ALERT.SIGNAL('wakeUpQueryReqSvc', '');
END;
/
......@@ -374,16 +374,31 @@ CREATE OR REPLACE PROCEDURE subRequestToDo(service IN VARCHAR2,
varSrId NUMBER;
varRName VARCHAR2(100);
varClientId NUMBER;
varUnusedMessage VARCHAR2(2048);
varUnusedStatus INTEGER;
BEGIN
-- Open a cursor on potential candidates
OPEN SRcur;
-- Loop on candidates until we can lock one
LOOP
-- Fetch next candidate
FETCH SRcur INTO varSrId;
IF SRcur%NOTFOUND THEN
-- No candidate, just return
-- Retrieve the first candidate
FETCH SRCur INTO varSrId;
IF SRCur%NOTFOUND THEN
-- There is no candidate available. Wait for next alert for a maximum of 3 seconds.
-- We do not wait forever in order to to give the control back to the
-- caller daemon in case it should exit.
CLOSE SRCur;
DBMS_ALERT.WAITONE('wakeUp'||service, varUnusedMessage, varUnusedStatus, 3);
-- try again to find something now that we waited
OPEN SRCur;
FETCH SRCur INTO varSrId;
IF SRCur%NOTFOUND THEN
-- still nothing. We will give back the control to the application
-- so that it can handle cases like signals and exit. We will probably
-- be back soon :-)
RETURN;
END IF;
END IF;
-- Loop on candidates until we can lock one
LOOP
BEGIN
-- Try to take a lock on the current candidate, and revalidate its status
SELECT /*+ INDEX(SR PK_SubRequest_ID) */ id INTO varSrId
......@@ -406,6 +421,13 @@ BEGIN
-- Go to next candidate, this subrequest is being processed by another thread
NULL;
END;
-- we are here because the current candidate could not be handled
-- let's go to the next one
FETCH SRcur INTO varSrId;
IF SRcur%NOTFOUND THEN
-- no next one ? then we can return
RETURN;
END IF;
END LOOP;
CLOSE SRcur;
......@@ -884,15 +906,33 @@ CREATE OR REPLACE PROCEDURE processBulkRequest(service IN VARCHAR2, requestId OU
AND svcHandler IS NOT NULL);
SrLocked EXCEPTION;
PRAGMA EXCEPTION_INIT (SrLocked, -54);
varUnusedMessage VARCHAR2(2048);
varUnusedStatus INTEGER;
BEGIN
-- in case we do not find anything, rtype should be 0
rType := 0;
-- Open a cursor on potential candidates
OPEN Rcur;
-- Retrieve the first candidate
FETCH Rcur INTO requestId;
IF Rcur%NOTFOUND THEN
-- There is no candidate available. Wait for next alert for a maximum of 3 seconds.
-- We do not wait forever in order to to give the control back to the
-- caller daemon in case it should exit.
CLOSE Rcur;
DBMS_ALERT.WAITONE('wakeUp'||service, varUnusedMessage, varUnusedStatus, 3);
-- try again to find something now that we waited
OPEN Rcur;
FETCH Rcur INTO requestId;
IF Rcur%NOTFOUND THEN
-- still nothing. We will give back the control to the application
-- so that it can handle cases like signals and exit. We will probably
-- be back soon :-)
RETURN;
END IF;
END IF;
-- Loop on candidates until we can lock one
LOOP
-- Fetch next candidate
FETCH Rcur INTO requestId;
EXIT WHEN Rcur%NOTFOUND;
BEGIN
-- Try to take a lock on the current candidate
SELECT type INTO rType FROM NewRequests WHERE id = requestId FOR UPDATE NOWAIT;
......@@ -920,6 +960,13 @@ BEGIN
-- Go to next candidate, this request is being processed by another thread
NULL;
END;
-- we are here because the current candidate could not be handled
-- let's go to the next one
FETCH Rcur INTO requestId;
IF Rcur%NOTFOUND THEN
-- no next one ? then we can return
RETURN;
END IF;
END LOOP;
CLOSE Rcur;
END;
......@@ -942,11 +989,31 @@ CREATE OR REPLACE PROCEDURE subRequestFailedToDo(srId OUT NUMBER, srFileName OUT
varSrAnswered INTEGER;
varRName VARCHAR2(100);
varClientId NUMBER;
varUnusedMessage VARCHAR2(2048);
varUnusedStatus INTEGER;
BEGIN
-- Open a cursor on potential candidates
OPEN c;
LOOP
-- Retrieve the first candidate
FETCH c INTO varSRId;
IF c%NOTFOUND THEN
-- There is no candidate available. Wait for next alert for a maximum of 3 seconds.
-- We do not wait forever in order to to give the control back to the
-- caller daemon in case it should exit.
CLOSE c;
DBMS_ALERT.WAITONE('wakeUpErrorSvc', varUnusedMessage, varUnusedStatus, 3);
-- try again to find something now that we waited
OPEN c;
FETCH c INTO varSRId;
EXIT WHEN c%NOTFOUND;
IF c%NOTFOUND THEN
-- still nothing. We will give back the control to the application
-- so that it can handle cases like signals and exit. We will probably
-- be back soon :-)
RETURN;
END IF;
END IF;
-- Loop on candidates until we can lock one
LOOP
BEGIN
SELECT /*+ INDEX(Subrequest PK_Subrequest_Id)*/ answered INTO varSrAnswered
FROM SubRequest PARTITION (P_STATUS_7)
......@@ -1036,6 +1103,11 @@ BEGIN
-- Go to next candidate, this subrequest is being processed by another thread
NULL;
END;
FETCH c INTO varSRId;
IF c%NOTFOUND THEN
-- no next one ? then we can return
RETURN;
END IF;
END LOOP;
CLOSE c;
END;
......@@ -1044,17 +1116,30 @@ END;
/* PL/SQL method to get the next request to do according to the given service */
CREATE OR REPLACE PROCEDURE requestToDo(service IN VARCHAR2, rId OUT INTEGER, rType OUT INTEGER) AS
varUnusedMessage VARCHAR2(2048);
varUnusedStatus INTEGER;
BEGIN
DELETE FROM NewRequests
WHERE type IN (
SELECT type FROM Type2Obj
WHERE svcHandler = service
AND svcHandler IS NOT NULL
)
WHERE type IN (SELECT type FROM Type2Obj
WHERE svcHandler = service
AND svcHandler IS NOT NULL)
AND ROWNUM < 2 RETURNING id, type INTO rId, rType;
EXCEPTION WHEN NO_DATA_FOUND THEN
rId := 0; -- nothing to do
rType := 0;
-- There is no candidate available. Wait for next alert for a maximum of 3 seconds.
-- We do not wait forever in order to to give the control back to the
-- caller daemon in case it should exit.
DBMS_ALERT.WAITONE('wakeUp'||service, varUnusedMessage, varUnusedStatus, 3);
-- try again to find something now that we waited
BEGIN
DELETE FROM NewRequests
WHERE type IN (SELECT type FROM Type2Obj
WHERE svcHandler = service
AND svcHandler IS NOT NULL)
AND ROWNUM < 2 RETURNING id, type INTO rId, rType;
EXCEPTION WHEN NO_DATA_FOUND THEN
rId := 0; -- nothing to do
rType := 0;
END;
END;
/
......
......@@ -38,6 +38,7 @@
#include "castor/PortNumbers.hpp"
#include "castor/System.hpp"
#include "castor/server/SignalThreadPool.hpp"
#include "castor/server/DbAlertedThreadPool.hpp"
#include "castor/replier/RequestReplier.hpp"
#include "castor/db/DbCnvSvc.hpp"
......@@ -73,43 +74,43 @@ int main(int argc, char* argv[]){
/* thread pools for the stager */
/*******************************/
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("JobRequestSvcThread",
new castor::stager::daemon::JobRequestSvcThread()));
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("PrepRequestSvcThread",
new castor::stager::daemon::PrepRequestSvcThread()));
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("StageRequestSvcThread",
new castor::stager::daemon::StageRequestSvcThread()));
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("BulkStageReqSvcThread",
new castor::stager::daemon::BulkStageReqSvcThread()));
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("QueryRequestSvcThread",
new castor::stager::daemon::QueryRequestSvcThread()));
// These threads poll the database every 2 seconds.
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("ErrorSvcThread",
new castor::stager::daemon::ErrorSvcThread(), 2));
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("jobSvcThread",
new castor::stager::daemon::JobSvcThread()));
stagerDaemon.addThreadPool
(new castor::server::SignalThreadPool
(new castor::server::DbAlertedThreadPool
("GcSvcThread",
new castor::stager::daemon::GcSvcThread()));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment