Commit 84778142 authored by Elvin Sindrilaru's avatar Elvin Sindrilaru
Browse files

XROOTD: Implement the abort of requests for XRootD clients which disconnect while waiting for the

        response from the stager.
        Adjust the stalling of clients so that they are not delayed for more than 5 min.
        Use consistently the stagerhost variable set in the xrd.cf.manager file when sending
        requests to the stager.
        Fix the deletion of expired user requests - this part of the code should be rarely used
        given that we now have a proper way of cleaning up requests from diconnected clients.
        Move the definition of ReqElements for the .cpp file.
        Drop the createifnotexists opaque tag since it's not used anymore.
        Refector some error messages and fix formatting.
parent b4e858a8
......@@ -62,7 +62,7 @@
/******************************************************************************/
XrdOucHash<XrdOucString>* XrdxCastor2Stager::msDelayStore;
xcastor::XrdxCastorClient* XrdxCastor2Fs::msCastorClient;
int XrdxCastor2Fs::msTokenLockTime = 5;
int XrdxCastor2Fs::msTokenLockTime = 60;
XrdxCastor2Fs* gMgr;
XrdSysError OfsEroute(0);
XrdOucTrace OfsTrace(&OfsEroute);
......@@ -172,14 +172,38 @@ XrdxCastor2Fs::newFile(char* user, int MonID)
}
//----------------------------------------------------------------------------
// Notification to filesystem when client disconnects
//----------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Notification when client disconnects
//------------------------------------------------------------------------------
void
XrdxCastor2Fs::Disc(const XrdSecEntity* client)
{
xcastor_debug("Client diconnected - proto=%s, name=%s, host=%s",
client->prot, client->name, client->host);
if (!client)
return;
xcastor_debug("Client diconnect tident=%s", client->tident);
std::vector<xcastor::XrdxCastorClient::ReqElement*>
req_abrt = msCastorClient->GetUserRequests(client->tident);
// Set client identity
uid_t client_uid;
gid_t client_gid;
GetIdMapping(client, client_uid, client_gid);
bool status = true;
XrdOucErrInfo error;
xcastor_info("tident=%s abort %i requests", req_abrt.size(), client->tident);
// We also need to delete the elements from the vector
for (std::vector<xcastor::XrdxCastorClient::ReqElement*>::iterator iter =
req_abrt.begin();
iter != req_abrt.end(); ++iter)
{
xcastor_debug("aborting req_uuid=%s", (*iter)->mRequest->reqId().c_str());
status = XrdxCastor2Stager::StageAbortRequest((*iter)->mRequest->reqId(),
(*iter)->mRequest->svcClassName(),
client_uid, client_gid, error);
delete (*iter);
}
}
......@@ -631,7 +655,7 @@ XrdxCastor2Fs::stageprepare(const char* path,
"valid service class mapping for fn = ", map_path.c_str());
// Get the allowed service class, preference for the default one
TIMING("STAGERQUERY", &preparetiming);
TIMING("PREP2GET", &preparetiming);
struct XrdxCastor2Stager::RespInfo resp_info;
bool done = XrdxCastor2Stager::Prepare2Get(error, client_uid, client_gid,
map_path.c_str(), allowed_svc.c_str(),
......@@ -692,6 +716,11 @@ XrdxCastor2Fs::prepare(XrdSfsPrep& pargs,
break;
}
TIMING("END", &preparetiming);
if (gMgr->mLogLevel == LOG_DEBUG)
preparetiming.Print();
return SFS_OK;
}
......@@ -1062,10 +1091,11 @@ XrdxCastor2Fs::stat(const char* path,
// TODO: enable this after XRootD 4.0.2 - extend querying support for
// files on tape by returning also the backup exists flag
// buf->st_rdev = XRDSFS_HASBKUP;
// buf->st_rdev = XRDSFS_OFFLINE;
// buf->st_rdev &= XRDSFS_OFFLINE;
}
xcastor_debug("map_path=%s, stage_status=%s", map_path.c_str(), stage_status.c_str());
xcastor_debug("map_path=%s, stage_status=%s",
map_path.c_str(), stage_status.c_str());
}
TIMING("END", &stattiming);
......@@ -1128,7 +1158,7 @@ XrdxCastor2Fs::lstat(const char* path,
buf->st_nlink = 1;
buf->st_uid = 0;
buf->st_gid = 0;
buf->st_rdev = 0; /* device type (if inode device) */
buf->st_rdev = 0; // device type (if inode device)
buf->st_size = 0;
buf->st_blksize = 4096;
buf->st_blocks = 0;
......@@ -1739,8 +1769,6 @@ XrdxCastor2Fs::GetIdMapping(const XrdSecEntity* client, uid_t& uid, gid_t& gid)
pwdcpy = (struct passwd*)memcpy(pwdcpy, pw, sizeof(struct passwd));
uid = pwdcpy->pw_uid;
gid = pwdcpy->pw_gid;
xcastor_debug("passwd add mapping role=%s -> uid/gid=%i/%i",
role.c_str(), (int)uid, int(gid));
XrdSysMutexHelper scope_lock(mMutexPasswd);
mPasswdStore->Add(role.c_str(), pwdcpy, 60);
}
......@@ -1751,7 +1779,7 @@ XrdxCastor2Fs::GetIdMapping(const XrdSecEntity* client, uid_t& uid, gid_t& gid)
}
XrdxCastor2FsUFS::SetId(uid, gid);
xcastor_debug("uid/gid=%i/%i", (int)uid, (int)gid);
xcastor_debug("role=%s -> uid/gid=%i/%i", role.c_str(), (int)uid, (int)gid);
}
......
......@@ -157,8 +157,8 @@ public:
XrdOucErrInfo& eInfo,
const XrdSecEntity* client = 0,
const char* opaque = 0);
//----------------------------------------------------------------------------
//! Determine if file 'path' actually exists
//!
......@@ -193,7 +193,7 @@ public:
//----------------------------------------------------------------------------
//! Implementation specific command
//! Implement file system custom command
//----------------------------------------------------------------------------
int fsctl(const int cmd,
const char* args,
......@@ -466,11 +466,9 @@ public:
return mStagerHost;
}
static xcastor::XrdxCastorClient* msCastorClient; ///< obj dealing with async requests/responses
static int msTokenLockTime; ///< specifies the grace period for client to show
///< up on a disk server in seconds before the token expires
char* ConfigFN; ///< path to config file
XrdxCastor2Acc* mServerAcc; ///< authorization module for token encryption/decryption
std::string mZeroProc; ///< path to a 0-byte file in the proc filesystem
......@@ -585,7 +583,7 @@ private:
//----------------------------------------------------------------------------
//! Stall message
//! Create stall message
//!
//! @param error error text and code
//! @param stime seconds to stall
......@@ -599,7 +597,7 @@ private:
//----------------------------------------------------------------------------
//! Get delay for the current operation set for the entire instance by the
//! admin. The delay value is read from the xcastor2.proc value specified
//! in the /etc/xrd.cf file.
//! in the /etc/xrd.cf.manager file.
//!
//! @param msg message to be returned to the client
//! @param isRW true if delay value is requested for a write operation,
......@@ -614,8 +612,9 @@ private:
std::set<std::string> mFsSet; ///< set of known diskserver hosts
XrdSysMutex mMutexFsSet; ///< mutex for the set of known diskservers
std::map<std::string, std::string> mNsMap; ///< namespace mapping
//! Map path to allowed svcClasses and the no_hsm option
std::map< std::string,
std::pair<std::list<std::string>, bool> > mStageMap;///< map path -> allowed svc
std::pair<std::list<std::string>, bool> > mStageMap;
XrdOucHash<struct passwd>* mPasswdStore; ///< cache passwd struct info
std::map<std::string, std::string> mRoleMap; ///< user role map
XrdSysMutex mMutexPasswd; ///< mutex for the passwd store
......@@ -623,6 +622,4 @@ private:
std::string mStagerHost; ///< stager host to which requests are sent
};
extern XrdxCastor2Fs* gMgr; ///< global instance of the redirector OFS subsystem
......@@ -29,6 +29,6 @@
//! Constants dealing with async requests for the stager
//! Timeout during which we received the response for an async request from
//! stager and the client need to show up to collect it
#define XCASTOR2FS_RESP_TIMEOUT 600
#define XCASTOR2FS_RESP_TIMEOUT 360
//! Maximum number of async requests in-fligth
#define XCASTOR2FS_MAX_REQUESTS 2000
......@@ -264,8 +264,8 @@ XrdxCastor2FsFile::open(const char* path,
if (allowed_svc.empty())
{
return gMgr->Emsg(epname, error, EINVAL, "open - no valid service"
" class for fn=", map_path.c_str());
return gMgr->Emsg(epname, error, EINVAL, "open - no valid service class"
" for fn=", map_path.c_str());
}
// Create response structure in which the pfn2 has the following structure
......@@ -554,7 +554,7 @@ XrdxCastor2FsFile::open(const char* path,
pfn1 = XrdOucString(resp_info.mRedirectionPfn1, slashpos+1);
pool = XrdOucString(resp_info.mRedirectionPfn1, 0, slashpos-1);
}
// Add the opaque authorization information for the server for read & write
XrdxCastor2Acc::AuthzInfo authz;
authz.sfn = (char*) origpath;
......@@ -603,10 +603,6 @@ XrdxCastor2FsFile::open(const char* path,
resp_info.mRedirectionHost += buf.csumvalue;
resp_info.mRedirectionHost += "&";
}
// Add the 0-size create flag
if (buf.filesize == 0)
resp_info.mRedirectionHost += "createifnotexist=1&";
}
int ecode = atoi(gMgr->mSrvTargetPort.c_str());
......
......@@ -71,7 +71,7 @@ public:
//! @param client authentication credentials, if any
//! @param info opaque information to be used as seen fit
//!
//! @return OOSS_OK upon success, otherwise SFS_ERROR is returned
//! @return SFS_REDIRECT upon success, otherwise SFS_ERROR is returned
//----------------------------------------------------------------------------
int open(const char* fileName,
XrdSfsFileOpenMode openMode,
......@@ -102,7 +102,8 @@ public:
//----------------------------------------------------------------------------
int getMmap(void** Addr, off_t& Size)
{
if (Addr) Addr = 0;
if (Addr)
Addr = 0;
Size = 0;
return SFS_OK;
......@@ -207,7 +208,7 @@ public:
//----------------------------------------------------------------------------
//!
//! Get checksum info
//----------------------------------------------------------------------------
int getCXinfo(char* cxtype, int& cxrsz)
{
......@@ -216,14 +217,13 @@ public:
//----------------------------------------------------------------------------
//! Implementation specific command
//! Implement custom commands
//----------------------------------------------------------------------------
int fctl(int, const char*, XrdOucErrInfo&)
{
return 0;
}
private:
int oh; ///< file handler
......
This diff is collapsed.
......@@ -18,8 +18,7 @@
*
*
* @author Castor Dev team, castor-dev@cern.ch
* @author Castor Dev team, castor-dev@cern.ch
*
*
******************************************************************************/
#pragma once
......@@ -60,7 +59,7 @@ namespace castor
class XrdxCastor2Stager : public LogId
{
public:
//----------------------------------------------------------------------------
//! Request information structure
//----------------------------------------------------------------------------
......@@ -70,7 +69,7 @@ public:
gid_t mGid;
const char* mPath;
const char* mServiceClass;
//--------------------------------------------------------------------------
//! Constructor
......@@ -84,7 +83,7 @@ public:
// empty
}
};
//----------------------------------------------------------------------------
......@@ -96,7 +95,7 @@ public:
XrdOucString mRedirectionPfn1;
XrdOucString mRedirectionPfn2;
XrdOucString mStageStatus;
//-------------------------------------------------------------------------
//! Constructor
//-------------------------------------------------------------------------
......@@ -106,25 +105,25 @@ public:
mRedirectionPfn2(""),
mStageStatus("")
{
//empty
// empty
}
};
//----------------------------------------------------------------------------
//! Constructor
//----------------------------------------------------------------------------
XrdxCastor2Stager();
//----------------------------------------------------------------------------
//! Destructor
//----------------------------------------------------------------------------
~XrdxCastor2Stager();
//----------------------------------------------------------------------------
//! Get a delay value for the corresponding tag
//! Get a delay value for the corresponding tag
//!
//! @param tag is made by concatenating the tident with the path of the req
//!
......@@ -137,7 +136,6 @@ public:
//! Drop delay tag from mapping
//!
//! @param tag tag to be dropped from mapping
//!
//----------------------------------------------------------------------------
static void DropDelayTag(const char* tag);
......@@ -145,34 +143,33 @@ public:
//----------------------------------------------------------------------------
//! Delete the request and response objects from the maps
//!
//! @param req request object
//! @param resp response object
//! @param req request object
//! @param resp response object
//! @param respvect response vector
//!
//----------------------------------------------------------------------------
static void DeleteReqResp(castor::stager::FileRequest* req,
castor::client::IResponseHandler* resp,
std::vector<castor::rh::Response*>* respvec);
//----------------------------------------------------------------------------
//! Process response received form the stager
//!
//!
//! @param error error object
//! @param respElem ReqElemement structure defined in XrdxCastorClient.hh
//! @param opType opertation type
//! @param reqInfo request informatio structure
//! @param respInfo response info structure to be filled in
//!
//! @return
//! @return SFS_OK if successful, otherwise SFS_ERROR.
//--------------------------------------------------------------------------
static int ProcessResponse(XrdOucErrInfo& error,
struct xcastor::XrdxCastorClient::ReqElement*& respElem,
const std::string& opType,
struct ReqInfo* reqInfo,
struct RespInfo& respInfo);
//----------------------------------------------------------------------------
//! Prepare2Get
//----------------------------------------------------------------------------
......@@ -181,27 +178,44 @@ public:
const char* path,
const char* serviceclass,
struct RespInfo& respInfo);
//----------------------------------------------------------------------------
//! Send an async request to the stager. This request can be a GET or a PUT
//! or an UPDATE.
//!
//! @param error error object
//! @param error error object
//! @param opType type of operation: get, put or update
//! @param reqInfo request information stucture
//! @param respInfo response information structure
//!
//!
//! @return SFS_OK answer received and successfully parsed
//! SFS_ERROR there was an error
//! SFS_STALL response not available yet, stall the client
//!
//! SFS_STALL response not available yet, stall the client
//----------------------------------------------------------------------------
static int DoAsyncReq(XrdOucErrInfo& error,
const std::string& opType,
struct ReqInfo* reqInfo,
struct RespInfo& respInfo);
//----------------------------------------------------------------------------
//! Send stage abort request to the stager daemon
//!
//! @param req_uuid request uuid which we are aborting
//! @param svc_class request service class
//! @param uid client uid
//! @param gid client gid
//! @param error error object
//!
//! @return True if successful, otherwise false.
//----------------------------------------------------------------------------
static bool StageAbortRequest(const std::string& req_uuid,
const std::string& svc_class,
uid_t uid, gid_t gid,
XrdOucErrInfo& error);
//----------------------------------------------------------------------------
//! Rm
//----------------------------------------------------------------------------
......@@ -222,10 +236,9 @@ public:
static XrdOucHash<XrdOucString>* msDelayStore; ///< delay store for each of the users
private:
static XrdSysRWLock msLockStore; ///< RW lock for the delay map
};
This diff is collapsed.
......@@ -18,8 +18,7 @@
*
*
* @author Castor Dev team, castor-dev@cern.ch
* @author Castor Dev team, castor-dev@cern.ch
*
*
******************************************************************************/
#pragma once
......@@ -29,6 +28,7 @@
#include <sys/ioctl.h>
#include <sys/poll.h>
#include <string>
#include <vector>
#include <map>
/*----------------------------------------------------------------------------*/
#include "XrdxCastorNamespace.hpp"
......@@ -53,20 +53,19 @@ class XrdxCastorClient: public LogId
{
public :
///< Forward declaration of struct
///< Forward declaration of struct
struct ReqElement;
//! Convenience typedef for map of async requests
typedef std::map<std::string, struct ReqElement*> AsyncReqMap;
typedef std::map<std::string, AsyncReqMap::iterator> AsyncUserMap;
//----------------------------------------------------------------------------
//! Obatin a new instance of the object while checking that the poller thread
//! was also started successfully.
//!
//! @return new object instance
//! Obatin a new instance of the object while also checking that the poller
//! thread was started successfully
//!
//! @return new object instance
//----------------------------------------------------------------------------
static XrdxCastorClient* Create();
......@@ -78,29 +77,27 @@ public :
//----------------------------------------------------------------------------
//! Send asynchronous request
//! Send asynchronous request
//!
//! @param iterHint hint where the element was inserted
//! @param userId user unique id for the request
//! @param rhHost stager host
//! @param rhPort port on the stager to where the req is sent
//! @param req request object
//! @param rh response handler object
//! @param rhPort port on the stager to where the req is sent
//! @param req request object
//! @param rh response handler object
//!
//! @return SFS_OK if request sent successfully
//! SFS_STALL the client is to back-off, this happens when there are
//! already to many requests on-the-fly
//!
//! SFS_STALL client stalled, this happens when there are already
//! too many requests in-flight
//----------------------------------------------------------------------------
virtual int SendAsyncRequest(const std::string& userId,
const std::string& rhHost,
unsigned int rhPort,
unsigned int rhPort,
castor::stager::Request* req,
castor::client::IResponseHandler* rh,
std::vector<castor::rh::Response*>* respvec)
;
std::vector<castor::rh::Response*>* respvec);
//----------------------------------------------------------------------------
//! Get response from the stager about a request sent earlier
//!
......@@ -110,7 +107,7 @@ public :
//! a response immediately after submitting the request and not after
// doing a stall
//!
//! @return element containing the response, only if the response is
//! @return element containing the response, only if the response is
//! available otherwise return 0
//----------------------------------------------------------------------------
struct ReqElement*
......@@ -118,20 +115,31 @@ public :
//----------------------------------------------------------------------------
//! Check if the user has already submitted the current request. If so, this
//! means it comes back to collect the response after a stall.
//!
//! Check if the user has already submitted the current request. If so, this
//! means it comes back to collect the response after a stall.
//!
//! @param path file path for the request
//! @param error error information
//!
//! @return true if request already submitted, otherwise false
//!
//----------------------------------------------------------------------------
bool HasSubmittedReq(const char* path, XrdOucErrInfo& error);
//----------------------------------------------------------------------------
//! Collect all requests belonging to the supplied user. The string saved
//! for each request contains also the svcClass. Therefore, the format is
//! as follows: req_uuid:svcClass.
//!
//! @param tident user identity
//!
//! @return vector of requests to be aborted
//----------------------------------------------------------------------------
std::vector<ReqElement*> GetUserRequests(const std::string& tident);
//----------------------------------------------------------------------------
//! Method polling for responses from the stager and adding these responses
//! Method polling for responses from the stager and adding these responses
//! to the mMapRequests, setting the appropriate flag for the current request.
//----------------------------------------------------------------------------
virtual void PollResponses();
......@@ -141,13 +149,12 @@ public :
//! Method used to strart the poller thread
//!
//! @param arg pointer to the XrdxCastorClient object
//!
//----------------------------------------------------------------------------
static void* StartPollerThread(void* arg);
//----------------------------------------------------------------------------
//! Get the status of the poller thread
//! Get the status of the poller thread
//!
//! @return true when poller thread not started, otherwise false
//----------------------------------------------------------------------------
......@@ -158,120 +165,60 @@ public :
//----------------------------------------------------------------------------
//! The ReqElement structure encapsulates the request object sent to the stager
//! The ReqElement structure encapsulates the request object sent to the stager
//! and also the response handler. These objects are stored in the mMapRequests
//! and are accessed by both the asyn thread polling for responses and also
//! by clients check-in for their responses.
//----------------------------------------------------------------------------
struct ReqElement
{
std::string mUserId; ///< id of the user who submited the request
castor::stager::Request* mRequest; ///< request object sent to stager
castor::client::IResponseHandler* mRh; ///< response hadler
std::string mUserId; ///< id of the user who submited the request
castor::stager::Request* mRequest; ///< request object sent to stager
castor::client::IResponseHandler* mRh; ///< response handler
std::vector<castor::rh::Response*>* mRespVec; ///< vector of responses
struct timeval mSendTime; ///< time when request was sent to stager
struct timeval mRecvTime; ///< time when response was received from stager
struct timeval mSendTime; ///< time when request was sent to stager
struct timeval mRecvTime; ///< time when response was received from stager
//--------------------------------------------------------------------------
//! Constructor
//!
//!
//! @param req request submited to stager
//! @param rh response handler object
//!
//! @param rh response handler object
//!
//--------------------------------------------------------------------------
ReqElement(const std::string& userId,
castor::stager::Request* req,
castor::client::IResponseHandler* rh,
std::vector<castor::rh::Response*>* respvec):
mUserId(userId),
m