Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta-dcache-frontend
Commits
4a9cd0ba
Commit
4a9cd0ba
authored
Nov 23, 2021
by
Tigran Mkrtchyan
☕
Browse files
cta-dcache: do not share LogContext between requests
parent
a6b971e8
Changes
2
Hide whitespace changes
Inline
Side-by-side
FrontendGRpcSvc.cpp
View file @
4a9cd0ba
...
...
@@ -31,14 +31,16 @@ Status CtaRpcImpl::Version(::grpc::ServerContext *context, const ::google::proto
Status
CtaRpcImpl
::
Archive
(
::
grpc
::
ServerContext
*
context
,
const
::
cta
::
dcache
::
rpc
::
ArchiveRequest
*
request
,
::
cta
::
dcache
::
rpc
::
ArchiveResponse
*
response
)
{
m_log
->
log
(
cta
::
log
::
INFO
,
"Archive request"
);
cta
::
log
::
LogContext
lc
(
*
m_log
);
cta
::
log
::
ScopedParamContainer
sp
(
lc
);
lc
.
log
(
cta
::
log
::
INFO
,
"Archive request"
);
cta
::
log
::
ScopedParamContainer
sp
(
*
m_log
);
sp
.
add
(
"remoteHost"
,
context
->
peer
());
sp
.
add
(
"request"
,
"archive"
);
const
std
::
string
storageClass
=
request
->
file
().
storageclass
();
m_log
->
log
(
cta
::
log
::
DEBUG
,
"Archive request for storageClass: "
+
storageClass
);
lc
.
log
(
cta
::
log
::
DEBUG
,
"Archive request for storageClass: "
+
storageClass
);
cta
::
common
::
dataStructures
::
RequesterIdentity
requester
;
requester
.
name
=
request
->
cli
().
user
().
username
();
...
...
@@ -54,7 +56,7 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::
sp
.
add
(
"fileID"
,
request
->
file
().
fid
());
try
{
uint64_t
archiveFileId
=
m_scheduler
->
checkAndGetNextArchiveFileId
(
instance
,
storageClass
,
requester
,
*
m_log
);
uint64_t
archiveFileId
=
m_scheduler
->
checkAndGetNextArchiveFileId
(
instance
,
storageClass
,
requester
,
lc
);
sp
.
add
(
"archiveID"
,
archiveFileId
);
cta
::
common
::
dataStructures
::
ArchiveRequest
archiveRequest
;
...
...
@@ -74,10 +76,10 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::
archiveRequest
.
creationLog
.
username
=
instance
;
archiveRequest
.
creationLog
.
time
=
time
(
nullptr
);
std
::
string
reqId
=
m_scheduler
->
queueArchiveWithGivenId
(
archiveFileId
,
instance
,
archiveRequest
,
*
m_log
);
std
::
string
reqId
=
m_scheduler
->
queueArchiveWithGivenId
(
archiveFileId
,
instance
,
archiveRequest
,
lc
);
sp
.
add
(
"reqId"
,
reqId
);
m_log
->
log
(
cta
::
log
::
INFO
,
"Archive request for storageClass: "
+
storageClass
lc
.
log
(
cta
::
log
::
INFO
,
"Archive request for storageClass: "
+
storageClass
+
" archiveFileId: "
+
std
::
to_string
(
archiveFileId
)
+
" RequestID: "
+
reqId
);
...
...
@@ -85,7 +87,7 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::
response
->
set_reqid
(
reqId
);
}
catch
(
cta
::
exception
::
Exception
&
ex
)
{
m_log
->
log
(
cta
::
log
::
CRIT
,
ex
.
getMessageValue
());
lc
.
log
(
cta
::
log
::
CRIT
,
ex
.
getMessageValue
());
return
::
grpc
::
Status
(
::
grpc
::
StatusCode
::
INTERNAL
,
ex
.
getMessageValue
());
}
...
...
@@ -95,10 +97,12 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::
Status
CtaRpcImpl
::
Delete
(
::
grpc
::
ServerContext
*
context
,
const
::
cta
::
dcache
::
rpc
::
DeleteRequest
*
request
,
::
google
::
protobuf
::
Empty
*
response
)
{
cta
::
log
::
ScopedParamContainer
sp
(
*
m_log
);
cta
::
log
::
LogContext
lc
(
*
m_log
);
cta
::
log
::
ScopedParamContainer
sp
(
lc
);
sp
.
add
(
"remoteHost"
,
context
->
peer
());
m_log
->
log
(
cta
::
log
::
DEBUG
,
"Delete request"
);
lc
.
log
(
cta
::
log
::
DEBUG
,
"Delete request"
);
sp
.
add
(
"request"
,
"delete"
);
auto
instance
=
request
->
instance
().
name
();
...
...
@@ -124,12 +128,12 @@ Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::r
try
{
deleteRequest
.
archiveFile
=
m_catalogue
->
getArchiveFileById
(
deleteRequest
.
archiveFileID
);
}
catch
(
cta
::
exception
::
Exception
&
ex
){
m_log
->
log
(
cta
::
log
::
CRIT
,
ex
.
getMessageValue
());
lc
.
log
(
cta
::
log
::
CRIT
,
ex
.
getMessageValue
());
return
::
grpc
::
Status
(
::
grpc
::
StatusCode
::
INTERNAL
,
ex
.
getMessageValue
());
}
m_scheduler
->
deleteArchive
(
instance
,
deleteRequest
,
*
m_log
);
m_scheduler
->
deleteArchive
(
instance
,
deleteRequest
,
lc
);
m_log
->
log
(
cta
::
log
::
INFO
,
"archive file deleted."
);
lc
.
log
(
cta
::
log
::
INFO
,
"archive file deleted."
);
return
Status
::
OK
;
}
...
...
@@ -137,12 +141,14 @@ Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::r
Status
CtaRpcImpl
::
Retrieve
(
::
grpc
::
ServerContext
*
context
,
const
::
cta
::
dcache
::
rpc
::
RetrieveRequest
*
request
,
::
cta
::
dcache
::
rpc
::
RetrieveResponse
*
response
)
{
cta
::
log
::
ScopedParamContainer
sp
(
*
m_log
);
cta
::
log
::
LogContext
lc
(
*
m_log
);
cta
::
log
::
ScopedParamContainer
sp
(
lc
);
sp
.
add
(
"remoteHost"
,
context
->
peer
());
sp
.
add
(
"request"
,
"retrieve"
);
const
std
::
string
storageClass
=
request
->
file
().
storageclass
();
m_log
->
log
(
cta
::
log
::
DEBUG
,
"Retrieve request for storageClass: "
+
storageClass
);
lc
.
log
(
cta
::
log
::
DEBUG
,
"Retrieve request for storageClass: "
+
storageClass
);
auto
instance
=
request
->
instance
().
name
();
...
...
@@ -174,9 +180,9 @@ Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache:
cta
::
utils
::
Timer
t
;
// Queue the request
std
::
string
reqId
=
m_scheduler
->
queueRetrieve
(
instance
,
retrieveRequest
,
*
m_log
);
std
::
string
reqId
=
m_scheduler
->
queueRetrieve
(
instance
,
retrieveRequest
,
lc
);
sp
.
add
(
"reqId"
,
reqId
);
m_log
->
log
(
cta
::
log
::
INFO
,
"Retrieve request for storageClass: "
+
storageClass
lc
.
log
(
cta
::
log
::
INFO
,
"Retrieve request for storageClass: "
+
storageClass
+
" archiveFileId: "
+
std
::
to_string
(
retrieveRequest
.
archiveFileID
)
+
" RequestID: "
+
reqId
);
...
...
@@ -195,13 +201,14 @@ void CtaRpcImpl::run(const std::string server_address) {
builder
.
RegisterService
(
this
);
std
::
unique_ptr
<
Server
>
server
(
builder
.
BuildAndStart
());
m_log
->
log
(
cta
::
log
::
INFO
,
"Listening on socket address: "
+
server_address
);
cta
::
log
::
LogContext
lc
(
*
m_log
);
lc
.
log
(
cta
::
log
::
INFO
,
"Listening on socket address: "
+
server_address
);
server
->
Wait
();
}
CtaRpcImpl
::
CtaRpcImpl
(
cta
::
log
::
Log
Context
*
lc
,
std
::
unique_ptr
<
cta
::
catalogue
::
Catalogue
>
&
catalogue
,
std
::
unique_ptr
<
cta
::
Scheduler
>
&
scheduler
)
:
CtaRpcImpl
::
CtaRpcImpl
(
cta
::
log
::
Log
ger
*
logger
,
std
::
unique_ptr
<
cta
::
catalogue
::
Catalogue
>
&
catalogue
,
std
::
unique_ptr
<
cta
::
Scheduler
>
&
scheduler
)
:
m_catalogue
(
std
::
move
(
catalogue
)),
m_scheduler
(
std
::
move
(
scheduler
))
{
m_log
=
l
c
;
m_log
=
l
ogger
;
}
using
namespace
cta
;
...
...
@@ -271,8 +278,8 @@ int main(const int argc, char *const *const argv) {
}
std
::
unique_ptr
<
cta
::
log
::
Logger
>
logger
=
std
::
unique_ptr
<
cta
::
log
::
Logger
>
(
new
log
::
StdoutLogger
(
shortHostName
,
"cta-dcache"
,
shortHeader
)
)
;
log
::
LogContext
lc
(
*
logger
);
log
::
StdoutL
ogger
l
ogger
(
shortHostName
,
"cta-dcache"
,
shortHeader
);
log
::
LogContext
lc
(
logger
);
lc
.
log
(
log
::
INFO
,
"Starting "
+
CTA_DCACHE_VERSION
);
...
...
@@ -286,7 +293,7 @@ int main(const int argc, char *const *const argv) {
const
rdbms
::
Login
catalogueLogin
=
rdbms
::
Login
::
parseFile
(
catalogueConfigFile
);
const
uint64_t
nbArchiveFileListingConns
=
2
;
auto
catalogueFactory
=
catalogue
::
CatalogueFactoryFactory
::
create
(
*
logger
,
catalogueLogin
,
auto
catalogueFactory
=
catalogue
::
CatalogueFactoryFactory
::
create
(
logger
,
catalogueLogin
,
10
,
nbArchiveFileListingConns
);
auto
catalogue
=
catalogueFactory
->
create
();
...
...
@@ -303,11 +310,11 @@ int main(const int argc, char *const *const argv) {
auto
backed
=
config
.
getConfEntString
(
"ObjectStore"
,
"BackendPath"
);
lc
.
log
(
log
::
INFO
,
"Using scheduler backend: "
+
backed
);
auto
sInit
=
cta
::
make_unique
<
SchedulerDBInit_t
>
(
"Frontend"
,
backed
,
*
logger
);
auto
scheddb
=
sInit
->
getSchedDB
(
*
catalogue
,
*
logger
);
auto
sInit
=
cta
::
make_unique
<
SchedulerDBInit_t
>
(
"Frontend"
,
backed
,
logger
);
auto
scheddb
=
sInit
->
getSchedDB
(
*
catalogue
,
logger
);
scheddb
->
setBottomHalfQueueSize
(
25000
);
auto
scheduler
=
cta
::
make_unique
<
cta
::
Scheduler
>
(
*
catalogue
,
*
scheddb
,
5
,
2
*
1000
*
1000
);
CtaRpcImpl
svc
(
&
l
c
,
catalogue
,
scheduler
);
CtaRpcImpl
svc
(
&
l
ogger
,
catalogue
,
scheduler
);
svc
.
run
(
server_address
);
}
FrontendGRpcSvc.h
View file @
4a9cd0ba
...
...
@@ -38,10 +38,10 @@ private:
std
::
unique_ptr
<
cta
::
SchedulerDB_t
>
m_scheddb
;
std
::
unique_ptr
<
cta
::
SchedulerDBInit_t
>
m_scheddb_init
;
std
::
unique_ptr
<
cta
::
Scheduler
>
m_scheduler
;
cta
::
log
::
Log
Context
*
m_log
;
cta
::
log
::
Log
ger
*
m_log
;
public:
CtaRpcImpl
(
cta
::
log
::
Log
Context
*
lc
,
std
::
unique_ptr
<
cta
::
catalogue
::
Catalogue
>
&
catalogue
,
std
::
unique_ptr
<
cta
::
Scheduler
>
&
scheduler
);
CtaRpcImpl
(
cta
::
log
::
Log
ger
*
logger
,
std
::
unique_ptr
<
cta
::
catalogue
::
Catalogue
>
&
catalogue
,
std
::
unique_ptr
<
cta
::
Scheduler
>
&
scheduler
);
void
run
(
const
std
::
string
server_address
);
Status
Version
(
::
grpc
::
ServerContext
*
context
,
const
::
google
::
protobuf
::
Empty
*
request
,
::
cta
::
admin
::
Version
*
response
);
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment