Skip to content
Snippets Groups Projects
Commit 31f15f9f authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan :coffee:
Browse files

frontend-grpc: add stubs delete/retrieve methods

parent 17df3c72
No related branches found
No related tags found
No related merge requests found
......@@ -25,112 +25,46 @@ Status CtaRpcImpl::Version(::grpc::ServerContext *context, const ::google::proto
return Status::OK;
}
Status CtaRpcImpl::GetStorageClasses(::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
::grpc::ServerWriter<::cta::admin::StorageClassLsItem>* writer) {
::cta::admin::StorageClassLsItem storaceClass_item;
for (const auto &sc: m_catalogue->getStorageClasses()) {
storaceClass_item.Clear();
storaceClass_item.set_name(sc.name);
storaceClass_item.set_nb_copies(sc.nbCopies);
storaceClass_item.set_vo(sc.vo.name);
storaceClass_item.mutable_creation_log()->set_username(sc.creationLog.username);
storaceClass_item.mutable_creation_log()->set_host(sc.creationLog.host);
storaceClass_item.mutable_creation_log()->set_time(sc.creationLog.time);
storaceClass_item.mutable_last_modification_log()->set_username(sc.lastModificationLog.username);
storaceClass_item.mutable_last_modification_log()->set_host(sc.lastModificationLog.host);
storaceClass_item.mutable_last_modification_log()->set_time(sc.lastModificationLog.time);
storaceClass_item.set_comment(sc.comment);
writer->Write(storaceClass_item, grpc::WriteOptions());
}
return Status::OK;
}
Status CtaRpcImpl::GetTapes(::grpc::ServerContext *context, const ::google::protobuf::Empty *request,
::grpc::ServerWriter<::cta::admin::TapeLsItem> *writer) {
::cta::admin::TapeLsItem tape_item;
for (const auto &tape: m_catalogue->getTapes()) {
tape_item.Clear();
tape_item.set_vid(tape.vid);
tape_item.set_media_type(tape.mediaType);
tape_item.set_vendor(tape.vendor);
tape_item.set_logical_library(tape.logicalLibraryName);
tape_item.set_tapepool(tape.tapePoolName);
tape_item.set_vo(tape.vo);
tape_item.set_encryption_key_name((bool) tape.encryptionKeyName ? tape.encryptionKeyName.value() : "-");
tape_item.set_capacity(tape.capacityInBytes);
tape_item.set_occupancy(tape.dataOnTapeInBytes);
tape_item.set_last_fseq(tape.lastFSeq);
tape_item.set_full(tape.full);
tape_item.set_from_castor(tape.isFromCastor);
tape_item.set_read_mount_count(tape.readMountCount);
tape_item.set_write_mount_count(tape.writeMountCount);
tape_item.set_nb_master_files(tape.nbMasterFiles);
tape_item.set_master_data_in_bytes(tape.masterDataInBytes);
if (tape.labelLog) {
::cta::common::TapeLog *labelLog = tape_item.mutable_label_log();
labelLog->set_drive(tape.labelLog.value().drive);
labelLog->set_time(tape.labelLog.value().time);
}
if (tape.lastWriteLog) {
::cta::common::TapeLog *lastWriteLog = tape_item.mutable_last_written_log();
lastWriteLog->set_drive(tape.lastWriteLog.value().drive);
lastWriteLog->set_time(tape.lastWriteLog.value().time);
}
if (tape.lastReadLog) {
::cta::common::TapeLog *lastReadLog = tape_item.mutable_last_read_log();
lastReadLog->set_drive(tape.lastReadLog.value().drive);
lastReadLog->set_time(tape.lastReadLog.value().time);
}
::cta::common::EntryLog *creationLog = tape_item.mutable_creation_log();
creationLog->set_username(tape.creationLog.username);
creationLog->set_host(tape.creationLog.host);
creationLog->set_time(tape.creationLog.time);
::cta::common::EntryLog *lastModificationLog = tape_item.mutable_last_modification_log();
lastModificationLog->set_username(tape.lastModificationLog.username);
lastModificationLog->set_host(tape.lastModificationLog.host);
lastModificationLog->set_time(tape.lastModificationLog.time);
tape_item.set_comment(tape.comment);
tape_item.set_state(tape.getStateStr());
tape_item.set_state_reason(tape.stateReason ? tape.stateReason.value() : "");
tape_item.set_state_update_time(tape.stateUpdateTime);
tape_item.set_state_modified_by(tape.stateModifiedBy);
writer->Write(tape_item, grpc::WriteOptions());
}
return Status::OK;
}
Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::eos::Notification* request, ::cta::xrd::Response* response) {
Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response) {
m_log->log(cta::log::INFO, "Archive request");
auto storageClassItor = request->file().xattr().find("sys.archive.storage_class");
if(request->file().xattr().end() == storageClassItor) {
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "storage class is not provided");
}
const std::string storageClass = storageClassItor->second;
m_log->log(cta::log::INFO, "Archive request for " + storageClass);
const std::string storageClass = request->file().storageclass();
m_log->log(cta::log::DEBUG, "Archive request for storageClass: " + storageClass);
cta::common::dataStructures::RequesterIdentity requester;
requester.name = request->cli().user().username();
requester.group = request->cli().user().groupname();
auto instance = request->wf().instance().name();
auto instance = request->instance().name();
try {
uint64_t archiveFileId = m_scheduler->checkAndGetNextArchiveFileId(instance, storageClass, requester, *m_log);
std::cout << "New Fileid: " << archiveFileId << std::endl;
cta::common::dataStructures::ArchiveRequest archiveRequest;
cta::checksum::ProtobufToChecksumBlob(request->file().csb(), archiveRequest.checksumBlob);
archiveRequest.diskFileInfo.owner_uid = 0;
archiveRequest.diskFileInfo.gid = 0;
archiveRequest.diskFileInfo.path = "";
archiveRequest.diskFileID = request->file().fid();
archiveRequest.fileSize = request->file().size();
archiveRequest.requester.name = request->cli().user().username();
archiveRequest.requester.group = request->cli().user().groupname();
archiveRequest.storageClass = storageClass;
archiveRequest.srcURL = request->transport().dst_url();
archiveRequest.archiveReportURL = request->transport().report_url();
archiveRequest.archiveErrorReportURL = request->transport().error_report_url();
archiveRequest.creationLog.host = "foo";
archiveRequest.creationLog.username = instance;
archiveRequest.creationLog.time = time(nullptr);
std::string archiveRequestAddr = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, *m_log);
m_log->log(cta::log::INFO, "Archive request for storageClass: " + storageClass
+ " archiveFileId: " + std::to_string(archiveFileId)
+ "RequestID: " + archiveRequestAddr);
response->set_fid(archiveFileId);
} catch (cta::exception::Exception &ex) {
m_log->log(cta::log::CRIT, ex.getMessageValue());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, ex.getMessageValue());
......@@ -139,6 +73,15 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::eos::Not
return Status::OK;
}
Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response) {
return Status::OK;
}
Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response) {
return Status::OK;
}
void CtaRpcImpl::run(const std::string server_address) {
ServerBuilder builder;
......
......@@ -15,6 +15,7 @@
#include "common/log/StdoutLogger.hpp"
#include "common/log/FileLogger.hpp"
#include "common/log/LogLevel.hpp"
#include <common/checksum/ChecksumBlobSerDeser.hpp>
#include "cta_rpc.grpc.pb.h"
......@@ -23,7 +24,7 @@ using cta::SchedulerDBInit_t;
using cta::SchedulerDB_t;
using cta::catalogue::Catalogue;
using cta::rdbms::Login;
using cta::rpc::CtaRpc;
using cta::dcache::rpc::CtaRpc;
using grpc::Server;
using grpc::ServerBuilder;
......@@ -43,10 +44,10 @@ public:
CtaRpcImpl(cta::log::LogContext *lc, std::unique_ptr<cta::catalogue::Catalogue> &catalogue, std::unique_ptr<cta::Scheduler> &scheduler);
void run(const std::string server_address);
Status Version(::grpc::ServerContext *context, const ::google::protobuf::Empty *request, ::cta::admin::Version *response);
Status GetTapes(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::cta::admin::TapeLsItem>* writer);
Status GetStorageClasses(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::cta::admin::StorageClassLsItem>* writer);
Status Archive(::grpc::ServerContext* context, const ::cta::eos::Notification* request, ::cta::xrd::Response* response);
Status Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response);
Status Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response);
Status Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response);
};
#endif //CTA_FRONTENDGRPCSVC_H
syntax = "proto3";
syntax = "proto3";
option java_multiple_files = true;
option java_package = "ch.cern.cta.rpc";
option java_package = "org.dcache.cta.rpc";
option optimize_for = CODE_SIZE;
package cta.rpc;
package cta.dcache.rpc;
import "google/protobuf/empty.proto";
import "cta_common.proto";
import "cta_admin.proto";
import "cta_eos.proto";
//
// gRPC interface to CTA frontend
//
message Metadata0 {
string fid = 1; // pnfsid
uint64 size = 2;
string storageClass = 3;
cta.common.ChecksumBlob csb = 4;
}
message ArchiveFileId {
uint64 fid = 1;
}
message ArchiveRequest {
cta.common.Service instance = 1; //< caller instance id
cta.eos.Client cli = 2; //< client information
cta.eos.Transport transport = 3; //< transport
Metadata0 file = 4; //< file meta data
}
message RetrieveRequest {
cta.common.Service instance = 1; //< caller instance id
cta.eos.Client cli = 2; //< client information
cta.eos.Transport transport = 3; //< transport
Metadata0 file = 4; //< file meta data
ArchiveFileId archiveId = 5;
}
message DeleteRequest {
cta.common.Service instance = 1; //< caller instance id
cta.eos.Client cli = 2; //< client information
cta.eos.Transport transport = 3; //< transport
Metadata0 file = 4; //< file meta data
ArchiveFileId archiveId = 5;
}
service CtaRpc {
rpc Version (google.protobuf.Empty) returns (cta.admin.Version) {}
rpc GetTapes (google.protobuf.Empty) returns (stream cta.admin.TapeLsItem) {}
rpc GetStorageClasses (google.protobuf.Empty) returns (stream cta.admin.StorageClassLsItem) {}
rpc Archive (cta.eos.Notification) returns (cta.xrd.Response) {}
rpc Archive (ArchiveRequest) returns (ArchiveFileId) {}
rpc Retrieve (RetrieveRequest) returns (google.protobuf.Empty) {}
rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment