Skip to content
Snippets Groups Projects
Commit 5016a1e5 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan :coffee:
Browse files

frontend-grpc: update protobuf messages to include request id

parent c8b01690
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@ Status CtaRpcImpl::Version(::grpc::ServerContext *context, const ::google::proto
return Status::OK;
}
Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response) {
Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveResponse* response) {
m_log->log(cta::log::INFO, "Archive request");
......@@ -58,12 +58,13 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::
archiveRequest.creationLog.username = instance;
archiveRequest.creationLog.time = time(nullptr);
std::string archiveRequestAddr = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, *m_log);
std::string reqId = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, *m_log);
m_log->log(cta::log::INFO, "Archive request for storageClass: " + storageClass
+ " archiveFileId: " + std::to_string(archiveFileId)
+ "RequestID: " + archiveRequestAddr);
+ "RequestID: " + reqId);
response->set_fid(archiveFileId);
response->set_reqid(reqId);
} catch (cta::exception::Exception &ex) {
m_log->log(cta::log::CRIT, ex.getMessageValue());
......@@ -74,7 +75,7 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::
}
Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response) {
Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty* response) {
m_log->log(cta::log::DEBUG, "Delete request");
auto instance = request->instance().name();
......@@ -87,7 +88,7 @@ Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::r
deleteRequest.diskFileId = request->file().fid();
deleteRequest.diskInstance = instance;
deleteRequest.archiveFileID = request->archiveid().fid();
deleteRequest.archiveFileID = request->archiveid();
// Delete the file from the catalogue or from the objectstore if archive request is created
cta::utils::Timer t;
......@@ -103,7 +104,7 @@ Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::r
return Status::OK;
}
Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response) {
Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::cta::dcache::rpc::RetrieveResponse *response) {
const std::string storageClass = request->file().storageclass();
......@@ -125,21 +126,17 @@ Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache:
retrieveRequest.creationLog.time = time(nullptr);
retrieveRequest.isVerifyOnly = false;
retrieveRequest.archiveFileID = request->archiveid().fid();
retrieveRequest.archiveFileID = request->archiveid();
cta::utils::Timer t;
// Queue the request
std::string retrieveReqId = m_scheduler->queueRetrieve(instance, retrieveRequest, *m_log);
std::string reqId = m_scheduler->queueRetrieve(instance, retrieveRequest, *m_log);
m_log->log(cta::log::INFO, "Retrieve request for storageClass: " + storageClass
+ " archiveFileId: " + std::to_string(retrieveRequest.archiveFileID)
+ "RequestID: " + retrieveReqId);
// TODO: we need to keep retrieveReqId to cancel request, if needed
// Set response type and add retrieve request reference as an extended attribute.
//response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId));
//response.set_type(cta::xrd::Response::RSP_SUCCESS);
+ "RequestID: " + reqId);
response->set_reqid(reqId);
return Status::OK;
}
......
......@@ -45,9 +45,9 @@ public:
void run(const std::string server_address);
Status Version(::grpc::ServerContext *context, const ::google::protobuf::Empty *request, ::cta::admin::Version *response);
Status Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response);
Status Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response);
Status Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response);
Status Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveResponse* response);
Status Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::cta::dcache::rpc::RetrieveResponse* response);
Status Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty* response);
};
#endif //CTA_FRONTENDGRPCSVC_H
......@@ -15,45 +15,66 @@ import "cta_eos.proto";
// gRPC interface to CTA frontend
//
/*
* File metadata
*/
message Metadata0 {
string fid = 1; // pnfsid
uint64 size = 2;
string storageClass = 3;
cta.common.ChecksumBlob csb = 4;
string fid = 1; // disk system unique file ID
uint64 size = 2; // file size
string storageClass = 3; // tape system related storage class (file family)
cta.common.ChecksumBlob csb = 4; // set of knows checksums for the given file
}
message ArchiveFileId {
uint64 fid = 1;
/*
* Response to the ARCHIVE request.
*/
message ArchiveResponse {
uint64 fid = 1; // tape system unique file ID
string reqId = 2; // tape request scheduler ID, used to cancel the request
}
/*
* Response to the RETRIEVE request.
*/
message RetrieveResponse {
string reqId = 1; // tape request scheduler ID, used to cancel the request
}
/*
* ARCHIVE request.
*/
message ArchiveRequest {
cta.common.Service instance = 1; //< caller instance id
cta.eos.Client cli = 2; //< client information
cta.eos.Transport transport = 3; //< transport
Metadata0 file = 4; //< file meta data
cta.common.Service instance = 1; // client instance ID
cta.eos.Client cli = 2; // requester information
cta.eos.Transport transport = 3; // IO, error and success endpoints
Metadata0 file = 4; // files' metadata
}
/*
* ARCHIVE request.
*/
message RetrieveRequest {
cta.common.Service instance = 1; //< caller instance id
cta.eos.Client cli = 2; //< client information
cta.eos.Transport transport = 3; //< transport
Metadata0 file = 4; //< file meta data
ArchiveFileId archiveId = 5;
cta.common.Service instance = 1; // client instance ID
cta.eos.Client cli = 2; // requester information
cta.eos.Transport transport = 3; // IO, error and success endpoints
Metadata0 file = 4; // files' metadata
uint64 archiveId = 5; // tape system unique file ID
}
message DeleteRequest {
cta.common.Service instance = 1; //< caller instance id
cta.eos.Client cli = 2; //< client information
cta.eos.Transport transport = 3; //< transport
Metadata0 file = 4; //< file meta data
ArchiveFileId archiveId = 5;
cta.common.Service instance = 1; // client instance ID
cta.eos.Client cli = 2; // requester information
cta.eos.Transport transport = 3; // IO, error and success endpoints
Metadata0 file = 4; // files' metadata
uint64 archiveId = 5; // tape system unique file ID
}
service CtaRpc {
rpc Version (google.protobuf.Empty) returns (cta.admin.Version) {}
rpc Archive (ArchiveRequest) returns (ArchiveFileId) {}
rpc Retrieve (RetrieveRequest) returns (google.protobuf.Empty) {}
rpc Archive (ArchiveRequest) returns (ArchiveResponse) {}
rpc Retrieve (RetrieveRequest) returns (RetrieveResponse) {}
rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment