From 5016a1e53d881e02543e98151d64d4efe15d11e8 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de> Date: Fri, 1 Oct 2021 14:46:22 +0200 Subject: [PATCH] frontend-grpc: update protobuf messages to include request id --- frontend_svc/gRPC/FrontendGRpcSvc.cpp | 25 ++++--- frontend_svc/gRPC/FrontendGRpcSvc.h | 6 +- .../grpc-proto/protobuf/cta_rpc.proto | 65 ++++++++++++------- 3 files changed, 57 insertions(+), 39 deletions(-) diff --git a/frontend_svc/gRPC/FrontendGRpcSvc.cpp b/frontend_svc/gRPC/FrontendGRpcSvc.cpp index c604f93a1c..c4d77be902 100644 --- a/frontend_svc/gRPC/FrontendGRpcSvc.cpp +++ b/frontend_svc/gRPC/FrontendGRpcSvc.cpp @@ -25,7 +25,7 @@ Status CtaRpcImpl::Version(::grpc::ServerContext *context, const ::google::proto return Status::OK; } -Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response) { +Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveResponse* response) { m_log->log(cta::log::INFO, "Archive request"); @@ -58,12 +58,13 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache:: archiveRequest.creationLog.username = instance; archiveRequest.creationLog.time = time(nullptr); - std::string archiveRequestAddr = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, *m_log); + std::string reqId = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, *m_log); m_log->log(cta::log::INFO, "Archive request for storageClass: " + storageClass + " archiveFileId: " + std::to_string(archiveFileId) - + "RequestID: " + archiveRequestAddr); + + "RequestID: " + reqId); response->set_fid(archiveFileId); + response->set_reqid(reqId); } catch (cta::exception::Exception &ex) { m_log->log(cta::log::CRIT, ex.getMessageValue()); @@ -74,7 +75,7 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache:: } -Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response) { +Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty* response) { m_log->log(cta::log::DEBUG, "Delete request"); auto instance = request->instance().name(); @@ -87,7 +88,7 @@ Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::r deleteRequest.diskFileId = request->file().fid(); deleteRequest.diskInstance = instance; - deleteRequest.archiveFileID = request->archiveid().fid(); + deleteRequest.archiveFileID = request->archiveid(); // Delete the file from the catalogue or from the objectstore if archive request is created cta::utils::Timer t; @@ -103,7 +104,7 @@ Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::r return Status::OK; } -Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response) { +Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::cta::dcache::rpc::RetrieveResponse *response) { const std::string storageClass = request->file().storageclass(); @@ -125,21 +126,17 @@ Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache: retrieveRequest.creationLog.time = time(nullptr); retrieveRequest.isVerifyOnly = false; - retrieveRequest.archiveFileID = request->archiveid().fid(); + retrieveRequest.archiveFileID = request->archiveid(); cta::utils::Timer t; // Queue the request - std::string retrieveReqId = m_scheduler->queueRetrieve(instance, retrieveRequest, *m_log); + std::string reqId = m_scheduler->queueRetrieve(instance, retrieveRequest, *m_log); m_log->log(cta::log::INFO, "Retrieve request for storageClass: " + storageClass + " archiveFileId: " + std::to_string(retrieveRequest.archiveFileID) - + "RequestID: " + retrieveReqId); - - // TODO: we need to keep retrieveReqId to cancel request, if needed - // Set response type and add retrieve request reference as an extended attribute. - //response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId)); - //response.set_type(cta::xrd::Response::RSP_SUCCESS); + + "RequestID: " + reqId); + response->set_reqid(reqId); return Status::OK; } diff --git a/frontend_svc/gRPC/FrontendGRpcSvc.h b/frontend_svc/gRPC/FrontendGRpcSvc.h index e9bb634bca..ec85ac1997 100644 --- a/frontend_svc/gRPC/FrontendGRpcSvc.h +++ b/frontend_svc/gRPC/FrontendGRpcSvc.h @@ -45,9 +45,9 @@ public: void run(const std::string server_address); Status Version(::grpc::ServerContext *context, const ::google::protobuf::Empty *request, ::cta::admin::Version *response); - Status Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response); - Status Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response); - Status Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response); + Status Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveResponse* response); + Status Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::cta::dcache::rpc::RetrieveResponse* response); + Status Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty* response); }; #endif //CTA_FRONTENDGRPCSVC_H diff --git a/frontend_svc/grpc-proto/protobuf/cta_rpc.proto b/frontend_svc/grpc-proto/protobuf/cta_rpc.proto index ffde5c0198..ad58d8e5f7 100644 --- a/frontend_svc/grpc-proto/protobuf/cta_rpc.proto +++ b/frontend_svc/grpc-proto/protobuf/cta_rpc.proto @@ -15,45 +15,66 @@ import "cta_eos.proto"; // gRPC interface to CTA frontend // +/* + * File metadata + */ message Metadata0 { - string fid = 1; // pnfsid - uint64 size = 2; - string storageClass = 3; - cta.common.ChecksumBlob csb = 4; + string fid = 1; // disk system unique file ID + uint64 size = 2; // file size + string storageClass = 3; // tape system related storage class (file family) + cta.common.ChecksumBlob csb = 4; // set of knows checksums for the given file } -message ArchiveFileId { - uint64 fid = 1; +/* + * Response to the ARCHIVE request. + */ +message ArchiveResponse { + uint64 fid = 1; // tape system unique file ID + string reqId = 2; // tape request scheduler ID, used to cancel the request } +/* + * Response to the RETRIEVE request. + */ +message RetrieveResponse { + string reqId = 1; // tape request scheduler ID, used to cancel the request +} + + +/* + * ARCHIVE request. + */ message ArchiveRequest { - cta.common.Service instance = 1; //< caller instance id - cta.eos.Client cli = 2; //< client information - cta.eos.Transport transport = 3; //< transport - Metadata0 file = 4; //< file meta data + cta.common.Service instance = 1; // client instance ID + cta.eos.Client cli = 2; // requester information + cta.eos.Transport transport = 3; // IO, error and success endpoints + Metadata0 file = 4; // files' metadata } +/* + * ARCHIVE request. + */ message RetrieveRequest { - cta.common.Service instance = 1; //< caller instance id - cta.eos.Client cli = 2; //< client information - cta.eos.Transport transport = 3; //< transport - Metadata0 file = 4; //< file meta data - ArchiveFileId archiveId = 5; + cta.common.Service instance = 1; // client instance ID + cta.eos.Client cli = 2; // requester information + cta.eos.Transport transport = 3; // IO, error and success endpoints + Metadata0 file = 4; // files' metadata + uint64 archiveId = 5; // tape system unique file ID } message DeleteRequest { - cta.common.Service instance = 1; //< caller instance id - cta.eos.Client cli = 2; //< client information - cta.eos.Transport transport = 3; //< transport - Metadata0 file = 4; //< file meta data - ArchiveFileId archiveId = 5; + cta.common.Service instance = 1; // client instance ID + cta.eos.Client cli = 2; // requester information + cta.eos.Transport transport = 3; // IO, error and success endpoints + Metadata0 file = 4; // files' metadata + uint64 archiveId = 5; // tape system unique file ID } service CtaRpc { rpc Version (google.protobuf.Empty) returns (cta.admin.Version) {} - rpc Archive (ArchiveRequest) returns (ArchiveFileId) {} - rpc Retrieve (RetrieveRequest) returns (google.protobuf.Empty) {} + rpc Archive (ArchiveRequest) returns (ArchiveResponse) {} + rpc Retrieve (RetrieveRequest) returns (RetrieveResponse) {} rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {} } \ No newline at end of file -- GitLab