diff --git a/frontend_svc/gRPC/FrontendGRpcSvc.cpp b/frontend_svc/gRPC/FrontendGRpcSvc.cpp index 2129adeb68b75c01a04c3c411f80b11e24264bf1..f7f0488bc5b5b28d94086b5fc3f31c2c9a5ad869 100644 --- a/frontend_svc/gRPC/FrontendGRpcSvc.cpp +++ b/frontend_svc/gRPC/FrontendGRpcSvc.cpp @@ -25,112 +25,46 @@ Status CtaRpcImpl::Version(::grpc::ServerContext *context, const ::google::proto return Status::OK; } -Status CtaRpcImpl::GetStorageClasses(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, - ::grpc::ServerWriter<::cta::admin::StorageClassLsItem>* writer) { - - ::cta::admin::StorageClassLsItem storaceClass_item; - for (const auto &sc: m_catalogue->getStorageClasses()) { - - storaceClass_item.Clear(); - - storaceClass_item.set_name(sc.name); - storaceClass_item.set_nb_copies(sc.nbCopies); - storaceClass_item.set_vo(sc.vo.name); - storaceClass_item.mutable_creation_log()->set_username(sc.creationLog.username); - storaceClass_item.mutable_creation_log()->set_host(sc.creationLog.host); - storaceClass_item.mutable_creation_log()->set_time(sc.creationLog.time); - storaceClass_item.mutable_last_modification_log()->set_username(sc.lastModificationLog.username); - storaceClass_item.mutable_last_modification_log()->set_host(sc.lastModificationLog.host); - storaceClass_item.mutable_last_modification_log()->set_time(sc.lastModificationLog.time); - storaceClass_item.set_comment(sc.comment); - writer->Write(storaceClass_item, grpc::WriteOptions()); - } - return Status::OK; -} - -Status CtaRpcImpl::GetTapes(::grpc::ServerContext *context, const ::google::protobuf::Empty *request, - ::grpc::ServerWriter<::cta::admin::TapeLsItem> *writer) { - - ::cta::admin::TapeLsItem tape_item; - - for (const auto &tape: m_catalogue->getTapes()) { - - tape_item.Clear(); - - tape_item.set_vid(tape.vid); - tape_item.set_media_type(tape.mediaType); - tape_item.set_vendor(tape.vendor); - tape_item.set_logical_library(tape.logicalLibraryName); - tape_item.set_tapepool(tape.tapePoolName); - tape_item.set_vo(tape.vo); - tape_item.set_encryption_key_name((bool) tape.encryptionKeyName ? tape.encryptionKeyName.value() : "-"); - tape_item.set_capacity(tape.capacityInBytes); - tape_item.set_occupancy(tape.dataOnTapeInBytes); - tape_item.set_last_fseq(tape.lastFSeq); - tape_item.set_full(tape.full); - tape_item.set_from_castor(tape.isFromCastor); - tape_item.set_read_mount_count(tape.readMountCount); - tape_item.set_write_mount_count(tape.writeMountCount); - tape_item.set_nb_master_files(tape.nbMasterFiles); - tape_item.set_master_data_in_bytes(tape.masterDataInBytes); - - if (tape.labelLog) { - ::cta::common::TapeLog *labelLog = tape_item.mutable_label_log(); - labelLog->set_drive(tape.labelLog.value().drive); - labelLog->set_time(tape.labelLog.value().time); - } - if (tape.lastWriteLog) { - ::cta::common::TapeLog *lastWriteLog = tape_item.mutable_last_written_log(); - lastWriteLog->set_drive(tape.lastWriteLog.value().drive); - lastWriteLog->set_time(tape.lastWriteLog.value().time); - } - if (tape.lastReadLog) { - ::cta::common::TapeLog *lastReadLog = tape_item.mutable_last_read_log(); - lastReadLog->set_drive(tape.lastReadLog.value().drive); - lastReadLog->set_time(tape.lastReadLog.value().time); - } - ::cta::common::EntryLog *creationLog = tape_item.mutable_creation_log(); - creationLog->set_username(tape.creationLog.username); - creationLog->set_host(tape.creationLog.host); - creationLog->set_time(tape.creationLog.time); - ::cta::common::EntryLog *lastModificationLog = tape_item.mutable_last_modification_log(); - lastModificationLog->set_username(tape.lastModificationLog.username); - lastModificationLog->set_host(tape.lastModificationLog.host); - lastModificationLog->set_time(tape.lastModificationLog.time); - tape_item.set_comment(tape.comment); - - tape_item.set_state(tape.getStateStr()); - tape_item.set_state_reason(tape.stateReason ? tape.stateReason.value() : ""); - tape_item.set_state_update_time(tape.stateUpdateTime); - tape_item.set_state_modified_by(tape.stateModifiedBy); - - writer->Write(tape_item, grpc::WriteOptions()); - } - return Status::OK; -} - - -Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::eos::Notification* request, ::cta::xrd::Response* response) { +Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response) { m_log->log(cta::log::INFO, "Archive request"); - auto storageClassItor = request->file().xattr().find("sys.archive.storage_class"); - if(request->file().xattr().end() == storageClassItor) { - return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "storage class is not provided"); - } - const std::string storageClass = storageClassItor->second; - m_log->log(cta::log::INFO, "Archive request for " + storageClass); - + const std::string storageClass = request->file().storageclass(); + m_log->log(cta::log::DEBUG, "Archive request for storageClass: " + storageClass); cta::common::dataStructures::RequesterIdentity requester; requester.name = request->cli().user().username(); requester.group = request->cli().user().groupname(); - auto instance = request->wf().instance().name(); + auto instance = request->instance().name(); try { uint64_t archiveFileId = m_scheduler->checkAndGetNextArchiveFileId(instance, storageClass, requester, *m_log); - std::cout << "New Fileid: " << archiveFileId << std::endl; + + cta::common::dataStructures::ArchiveRequest archiveRequest; + cta::checksum::ProtobufToChecksumBlob(request->file().csb(), archiveRequest.checksumBlob); + archiveRequest.diskFileInfo.owner_uid = 0; + archiveRequest.diskFileInfo.gid = 0; + archiveRequest.diskFileInfo.path = ""; + archiveRequest.diskFileID = request->file().fid(); + archiveRequest.fileSize = request->file().size(); + archiveRequest.requester.name = request->cli().user().username(); + archiveRequest.requester.group = request->cli().user().groupname(); + archiveRequest.storageClass = storageClass; + archiveRequest.srcURL = request->transport().dst_url(); + archiveRequest.archiveReportURL = request->transport().report_url(); + archiveRequest.archiveErrorReportURL = request->transport().error_report_url(); + archiveRequest.creationLog.host = "foo"; + archiveRequest.creationLog.username = instance; + archiveRequest.creationLog.time = time(nullptr); + + std::string archiveRequestAddr = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, *m_log); + m_log->log(cta::log::INFO, "Archive request for storageClass: " + storageClass + + " archiveFileId: " + std::to_string(archiveFileId) + + "RequestID: " + archiveRequestAddr); + + response->set_fid(archiveFileId); + } catch (cta::exception::Exception &ex) { m_log->log(cta::log::CRIT, ex.getMessageValue()); return ::grpc::Status(::grpc::StatusCode::INTERNAL, ex.getMessageValue()); @@ -139,6 +73,15 @@ Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::eos::Not return Status::OK; } + +Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response) { + return Status::OK; +} + +Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response) { + return Status::OK; +} + void CtaRpcImpl::run(const std::string server_address) { ServerBuilder builder; diff --git a/frontend_svc/gRPC/FrontendGRpcSvc.h b/frontend_svc/gRPC/FrontendGRpcSvc.h index 7d10e04d5b009823933a3f6fa44c574cac4cb9f8..e9bb634bca6c77a6f6ffee6afa2493f487688ccd 100644 --- a/frontend_svc/gRPC/FrontendGRpcSvc.h +++ b/frontend_svc/gRPC/FrontendGRpcSvc.h @@ -15,6 +15,7 @@ #include "common/log/StdoutLogger.hpp" #include "common/log/FileLogger.hpp" #include "common/log/LogLevel.hpp" +#include <common/checksum/ChecksumBlobSerDeser.hpp> #include "cta_rpc.grpc.pb.h" @@ -23,7 +24,7 @@ using cta::SchedulerDBInit_t; using cta::SchedulerDB_t; using cta::catalogue::Catalogue; using cta::rdbms::Login; -using cta::rpc::CtaRpc; +using cta::dcache::rpc::CtaRpc; using grpc::Server; using grpc::ServerBuilder; @@ -43,10 +44,10 @@ public: CtaRpcImpl(cta::log::LogContext *lc, std::unique_ptr<cta::catalogue::Catalogue> &catalogue, std::unique_ptr<cta::Scheduler> &scheduler); void run(const std::string server_address); Status Version(::grpc::ServerContext *context, const ::google::protobuf::Empty *request, ::cta::admin::Version *response); - Status GetTapes(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::cta::admin::TapeLsItem>* writer); - Status GetStorageClasses(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::cta::admin::StorageClassLsItem>* writer); - Status Archive(::grpc::ServerContext* context, const ::cta::eos::Notification* request, ::cta::xrd::Response* response); + Status Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveFileId* response); + Status Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::google::protobuf::Empty *response); + Status Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty *response); }; #endif //CTA_FRONTENDGRPCSVC_H diff --git a/frontend_svc/grpc-proto/protobuf/cta_rpc.proto b/frontend_svc/grpc-proto/protobuf/cta_rpc.proto index e6299a67255cebd25a267612ea7be292fdff1068..ffde5c0198fb02bafbb022fa056c3d3d3d718a22 100644 --- a/frontend_svc/grpc-proto/protobuf/cta_rpc.proto +++ b/frontend_svc/grpc-proto/protobuf/cta_rpc.proto @@ -1,22 +1,59 @@ - -syntax = "proto3"; +syntax = "proto3"; option java_multiple_files = true; -option java_package = "ch.cern.cta.rpc"; +option java_package = "org.dcache.cta.rpc"; option optimize_for = CODE_SIZE; -package cta.rpc; +package cta.dcache.rpc; import "google/protobuf/empty.proto"; +import "cta_common.proto"; import "cta_admin.proto"; import "cta_eos.proto"; // // gRPC interface to CTA frontend // + +message Metadata0 { + string fid = 1; // pnfsid + uint64 size = 2; + string storageClass = 3; + cta.common.ChecksumBlob csb = 4; +} + +message ArchiveFileId { + uint64 fid = 1; +} + +message ArchiveRequest { + cta.common.Service instance = 1; //< caller instance id + cta.eos.Client cli = 2; //< client information + cta.eos.Transport transport = 3; //< transport + Metadata0 file = 4; //< file meta data +} + +message RetrieveRequest { + cta.common.Service instance = 1; //< caller instance id + cta.eos.Client cli = 2; //< client information + cta.eos.Transport transport = 3; //< transport + Metadata0 file = 4; //< file meta data + ArchiveFileId archiveId = 5; +} + +message DeleteRequest { + cta.common.Service instance = 1; //< caller instance id + cta.eos.Client cli = 2; //< client information + cta.eos.Transport transport = 3; //< transport + Metadata0 file = 4; //< file meta data + ArchiveFileId archiveId = 5; +} + service CtaRpc { rpc Version (google.protobuf.Empty) returns (cta.admin.Version) {} - rpc GetTapes (google.protobuf.Empty) returns (stream cta.admin.TapeLsItem) {} - rpc GetStorageClasses (google.protobuf.Empty) returns (stream cta.admin.StorageClassLsItem) {} - rpc Archive (cta.eos.Notification) returns (cta.xrd.Response) {} + + rpc Archive (ArchiveRequest) returns (ArchiveFileId) {} + rpc Retrieve (RetrieveRequest) returns (google.protobuf.Empty) {} + rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {} + } \ No newline at end of file