Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
cta
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Harbor Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
dCache
cta
Commits
ebf151d7
Commit
ebf151d7
authored
7 years ago
by
Michael Davis
Browse files
Options
Downloads
Patches
Plain Diff
[EOS-CTA] Implements DELETE workflow
parent
9a81a99a
Branches
Branches containing commit
Tags
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
xroot_plugins/XrdSsiCtaRequestProc.cpp
+77
-56
77 additions, 56 deletions
xroot_plugins/XrdSsiCtaRequestProc.cpp
xroot_ssi_pb/XrdSsiPbResource.hpp
+4
-8
4 additions, 8 deletions
xroot_ssi_pb/XrdSsiPbResource.hpp
with
81 additions
and
64 deletions
xroot_plugins/XrdSsiCtaRequestProc.cpp
+
77
−
56
View file @
ebf151d7
...
...
@@ -37,13 +37,13 @@ namespace XrdSsiPb {
* Process the CLOSEW workflow event
*
* @param[in,out] scheduler CTA Scheduler to queue this request
* @param[in,out] lc Log Context for this request
* @param[in] cli
_username Client username,
from authentication key for the request
* @param[in,out] lc
CTA
Log Context for this request
* @param[in] cli
ent XRootD Client Entity, taken
from authentication key for the request
* @param[in] notification EOS WFE Notification message
* @param[out] response Response message to return to EOS
*/
static
void
requestProcCLOSEW
(
cta
::
Scheduler
&
scheduler
,
cta
::
log
::
LogContext
&
lc
,
const
std
::
string
&
cli_username
,
const
eos
::
wfe
::
Notification
&
notification
,
eos
::
wfe
::
Response
&
response
)
static
void
requestProcCLOSEW
(
cta
::
Scheduler
&
scheduler
,
cta
::
log
::
LogContext
&
lc
,
XrdSsiEntity
&
client
,
const
eos
::
wfe
::
Notification
&
notification
,
eos
::
wfe
::
Response
&
response
)
{
// Unpack message
...
...
@@ -69,7 +69,7 @@ static void requestProcCLOSEW(cta::Scheduler &scheduler, cta::log::LogContext &l
// Queue the request
uint64_t
archiveFileId
=
scheduler
.
queueArchive
(
cli
_user
name
,
request
,
lc
);
uint64_t
archiveFileId
=
scheduler
.
queueArchive
(
cli
ent
.
name
,
request
,
lc
);
// Set archiveFileId in response (deprecated)
...
...
@@ -91,12 +91,12 @@ static void requestProcCLOSEW(cta::Scheduler &scheduler, cta::log::LogContext &l
*
* @param[in,out] scheduler CTA Scheduler to queue this request
* @param[in,out] lc Log Context for this request
* @param[in] cli
_username Client username,
from authentication key for the request
* @param[in] cli
ent XRootD Client Entity, taken
from authentication key for the request
* @param[in] notification EOS WFE Notification message
* @param[out] response Response message to return to EOS
*/
static
void
requestProcPREPARE
(
cta
::
Scheduler
&
scheduler
,
cta
::
log
::
LogContext
&
lc
,
const
std
::
string
&
cli_username
,
const
eos
::
wfe
::
Notification
&
notification
,
eos
::
wfe
::
Response
&
response
)
static
void
requestProcPREPARE
(
cta
::
Scheduler
&
scheduler
,
cta
::
log
::
LogContext
&
lc
,
XrdSsiEntity
&
client
,
const
eos
::
wfe
::
Notification
&
notification
,
eos
::
wfe
::
Response
&
response
)
{
// Unpack message
...
...
@@ -125,7 +125,7 @@ static void requestProcPREPARE(cta::Scheduler &scheduler, cta::log::LogContext &
// Queue the request
scheduler
.
queueRetrieve
(
cli
_user
name
,
request
,
lc
);
scheduler
.
queueRetrieve
(
cli
ent
.
name
,
request
,
lc
);
// Set response type
...
...
@@ -139,58 +139,78 @@ static void requestProcPREPARE(cta::Scheduler &scheduler, cta::log::LogContext &
*
* @param[in,out] scheduler CTA Scheduler to queue this request
* @param[in,out] lc Log Context for this request
* @param[in] cli
_username Client username,
from authentication key for the request
* @param[in] cli
ent XRootD Client Entity, taken
from authentication key for the request
* @param[in] notification EOS WFE Notification message
* @param[out] response Response message to return to EOS
*/
#if 0
static void requestProcDELETE(cta::Scheduler &scheduler, cta::log::LogContext &lc,
const std::string &cli_username, const eos::wfe::Notification ¬ification, eos::wfe::Response &response)
static
void
requestProcDELETE
(
cta
::
Scheduler
&
scheduler
,
cta
::
log
::
LogContext
&
lc
,
XrdSsiEntity
&
client
,
const
eos
::
wfe
::
Notification
&
notification
,
eos
::
wfe
::
Response
&
response
)
{
cta::common::dataStructures::UserIdentity originator;
originator.name=user.value();
originator.group=group.value();
cta::common::dataStructures::DeleteArchiveRequest request;
request.archiveFileID=id.value();
request.requester=originator;
const cta::common::dataStructures::ArchiveFile archiveFile = m_scheduler->deleteArchive(m_cliIdentity.username, request);
std::list<cta::log::Param> params;
params.push_back(cta::log::Param("USERNAME", m_cliIdentity.username));
params.push_back(cta::log::Param("HOST", m_cliIdentity.host));
params.push_back(cta::log::Param("fileId", std::to_string(archiveFile.archiveFileID)));
params.push_back(cta::log::Param("diskInstance", archiveFile.diskInstance));
params.push_back(cta::log::Param("diskFileId", archiveFile.diskFileId));
params.push_back(cta::log::Param("diskFileInfo.path", archiveFile.diskFileInfo.path));
params.push_back(cta::log::Param("diskFileInfo.owner", archiveFile.diskFileInfo.owner));
params.push_back(cta::log::Param("diskFileInfo.group", archiveFile.diskFileInfo.group));
params.push_back(cta::log::Param("diskFileInfo.recoveryBlob", archiveFile.diskFileInfo.recoveryBlob));
params.push_back(cta::log::Param("fileSize", std::to_string(archiveFile.fileSize)));
params.push_back(cta::log::Param("checksumType", archiveFile.checksumType));
params.push_back(cta::log::Param("checksumValue", archiveFile.checksumValue));
params.push_back(cta::log::Param("creationTime", std::to_string(archiveFile.creationTime)));
params.push_back(cta::log::Param("reconciliationTime", std::to_string(archiveFile.reconciliationTime)));
params.push_back(cta::log::Param("storageClass", archiveFile.storageClass));
for(auto it=archiveFile.tapeFiles.begin(); it!=archiveFile.tapeFiles.end(); it++) {
std::stringstream tapeCopyLogStream;
tapeCopyLogStream << "copy number: " << it->first
<< " vid: " << it->second.vid
<< " fSeq: " << it->second.fSeq
<< " blockId: " << it->second.blockId
<< " creationTime: " << it->second.creationTime
<< " compressedSize: " << it->second.compressedSize
<< " checksumType: " << it->second.checksumType //this shouldn't be here: repeated field
<< " checksumValue: " << it->second.checksumValue //this shouldn't be here: repeated field
<< " copyNb: " << it->second.copyNb; //this shouldn't be here: repeated field
params.push_back(cta::log::Param("TAPE FILE", tapeCopyLogStream.str()));
}
m_log(log::INFO, "Archive File Deleted", params);
return cmdlineOutput.str();
// Unpack message
cta
::
common
::
dataStructures
::
UserIdentity
originator
;
originator
.
name
=
notification
.
cli
().
user
().
username
();
originator
.
group
=
notification
.
cli
().
user
().
groupname
();
cta
::
common
::
dataStructures
::
DiskFileInfo
diskFileInfo
;
diskFileInfo
.
owner
=
notification
.
file
().
owner
().
username
();
diskFileInfo
.
group
=
notification
.
file
().
owner
().
groupname
();
diskFileInfo
.
path
=
notification
.
file
().
lpath
();
cta
::
common
::
dataStructures
::
DeleteArchiveRequest
request
;
request
.
requester
=
originator
;
// CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
// must be converted to a valid uint64_t
std
::
string
archiveFileIdStr
=
notification
.
file
().
xattr
().
at
(
"CTA_ArchiveFileId"
);
if
((
request
.
archiveFileID
=
strtoul
(
archiveFileIdStr
.
c_str
(),
nullptr
,
10
))
==
0
)
{
throw
PbException
(
"Invalid archiveFileID "
+
archiveFileIdStr
);
}
// Queue the request
const
cta
::
common
::
dataStructures
::
ArchiveFile
archiveFile
=
scheduler
.
deleteArchive
(
client
.
name
,
request
);
// Create a log entry
std
::
list
<
cta
::
log
::
Param
>
params
;
params
.
push_back
(
cta
::
log
::
Param
(
"USERNAME"
,
client
.
name
));
params
.
push_back
(
cta
::
log
::
Param
(
"HOST"
,
client
.
host
));
params
.
push_back
(
cta
::
log
::
Param
(
"fileId"
,
std
::
to_string
(
archiveFile
.
archiveFileID
)));
params
.
push_back
(
cta
::
log
::
Param
(
"diskInstance"
,
archiveFile
.
diskInstance
));
params
.
push_back
(
cta
::
log
::
Param
(
"diskFileId"
,
archiveFile
.
diskFileId
));
params
.
push_back
(
cta
::
log
::
Param
(
"diskFileInfo.path"
,
archiveFile
.
diskFileInfo
.
path
));
params
.
push_back
(
cta
::
log
::
Param
(
"diskFileInfo.owner"
,
archiveFile
.
diskFileInfo
.
owner
));
params
.
push_back
(
cta
::
log
::
Param
(
"diskFileInfo.group"
,
archiveFile
.
diskFileInfo
.
group
));
params
.
push_back
(
cta
::
log
::
Param
(
"diskFileInfo.recoveryBlob"
,
archiveFile
.
diskFileInfo
.
recoveryBlob
));
params
.
push_back
(
cta
::
log
::
Param
(
"fileSize"
,
std
::
to_string
(
archiveFile
.
fileSize
)));
params
.
push_back
(
cta
::
log
::
Param
(
"checksumType"
,
archiveFile
.
checksumType
));
params
.
push_back
(
cta
::
log
::
Param
(
"checksumValue"
,
archiveFile
.
checksumValue
));
params
.
push_back
(
cta
::
log
::
Param
(
"creationTime"
,
std
::
to_string
(
archiveFile
.
creationTime
)));
params
.
push_back
(
cta
::
log
::
Param
(
"reconciliationTime"
,
std
::
to_string
(
archiveFile
.
reconciliationTime
)));
params
.
push_back
(
cta
::
log
::
Param
(
"storageClass"
,
archiveFile
.
storageClass
));
for
(
auto
it
=
archiveFile
.
tapeFiles
.
begin
();
it
!=
archiveFile
.
tapeFiles
.
end
();
it
++
)
{
std
::
stringstream
tapeCopyLogStream
;
tapeCopyLogStream
<<
"copy number: "
<<
it
->
first
<<
" vid: "
<<
it
->
second
.
vid
<<
" fSeq: "
<<
it
->
second
.
fSeq
<<
" blockId: "
<<
it
->
second
.
blockId
<<
" creationTime: "
<<
it
->
second
.
creationTime
<<
" compressedSize: "
<<
it
->
second
.
compressedSize
<<
" checksumType: "
<<
it
->
second
.
checksumType
//this shouldn't be here: repeated field
<<
" checksumValue: "
<<
it
->
second
.
checksumValue
//this shouldn't be here: repeated field
<<
" copyNb: "
<<
it
->
second
.
copyNb
;
//this shouldn't be here: repeated field
params
.
push_back
(
cta
::
log
::
Param
(
"TAPE FILE"
,
tapeCopyLogStream
.
str
()));
}
lc
.
logger
()(
cta
::
log
::
INFO
,
"Archive File Deleted"
,
params
);
// Set response type
response
.
set_type
(
eos
::
wfe
::
Response
::
RSP_SUCCESS
);
}
#endif
...
...
@@ -228,9 +248,9 @@ void RequestProc<eos::wfe::Notification, eos::wfe::Response, eos::wfe::Alert>::E
OutputJsonString
(
std
::
cerr
,
&
m_request
);
#endif
// Get
client username
// Get
XRootD client entity
std
::
string
cli_username
=
getClient
(
m_resource
);
auto
client
=
getClient
(
m_resource
);
// Get CTA Scheduler and Log Context
...
...
@@ -244,12 +264,13 @@ void RequestProc<eos::wfe::Notification, eos::wfe::Response, eos::wfe::Alert>::E
using
namespace
eos
::
wfe
;
case
Workflow
::
CLOSEW
:
requestProcCLOSEW
(
scheduler
,
lc
,
cli
_username
,
m_request
,
m_metadata
);
break
;
requestProcCLOSEW
(
scheduler
,
lc
,
cli
ent
,
m_request
,
m_metadata
);
break
;
case
Workflow
::
PREPARE
:
requestProcPREPARE
(
scheduler
,
lc
,
cli
_username
,
m_request
,
m_metadata
);
break
;
requestProcPREPARE
(
scheduler
,
lc
,
cli
ent
,
m_request
,
m_metadata
);
break
;
case
Workflow
::
DELETE
:
requestProcDELETE
(
scheduler
,
lc
,
client
,
m_request
,
m_metadata
);
break
;
default:
throw
PbException
(
"Workflow Event type "
+
std
::
to_string
(
m_request
.
wf
().
event
())
+
" is not supported."
);
...
...
This diff is collapsed.
Click to expand it.
xroot_ssi_pb/XrdSsiPbResource.hpp
+
4
−
8
View file @
ebf151d7
...
...
@@ -45,7 +45,7 @@ namespace XrdSsiPb {
* tident const char* Trace identifier (always preset)
*/
static
const
char
*
getClient
(
const
XrdSsiResource
&
resource
)
static
const
XrdSsiEntity
&
getClient
(
const
XrdSsiResource
&
resource
)
{
#ifdef XRDSSI_DEBUG
std
::
cerr
<<
"[DEBUG] Resource name: "
<<
resource
.
rName
<<
std
::
endl
...
...
@@ -69,16 +69,12 @@ static const char *getClient(const XrdSsiResource &resource)
<<
std
::
endl
;
#endif
if
(
resource
.
client
==
nullptr
||
resource
.
client
->
name
==
nullptr
)
if
(
resource
.
client
==
nullptr
)
{
throw
XrdSsiException
(
"getClient(): resource.client
contain
s a null pointer"
);
throw
XrdSsiException
(
"getClient(): resource.client
i
s a null pointer"
);
}
#ifdef XRDSSI_DEBUG
std
::
cerr
<<
"resource.client->name = "
<<
resource
.
client
->
name
<<
std
::
endl
;
#endif
return
resource
.
client
->
name
;
return
*
(
resource
.
client
);
}
}
// namespace XrdSsiPb
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment