Commit 1be848f6 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

src: rename frontend_svc subdir to cta-dcache

parents
# @project The CERN Tape Archive (CTA)
# @copyright 2021 DESY
# @license This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
cmake_minimum_required (VERSION 2.6)
add_subdirectory(gRPC)
# @project The CERN Tape Archive (CTA)
# @copyright Copyright(C) 2021 DESY
# @license This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
cmake_minimum_required (VERSION 2.6)
find_package(Protobuf3 REQUIRED)
include_directories(${CMAKE_BINARY_DIR}/eos_cta
${PROTOBUF3_INCLUDE_DIRS})
add_executable(cta-dcache FrontendGRpcSvc.cpp)
target_link_libraries(cta-dcache CtaGRpc
${PROTOBUF3_LIBRARIES} ${GRPC_GRPC++_LIBRARY}
ctascheduler ctacommon ctaobjectstore ctacatalogue)
set_property(TARGET cta-dcache APPEND PROPERTY INSTALL_RPATH ${PROTOBUF3_RPATH})
install(TARGETS cta-dcache DESTINATION usr/bin)
install (FILES cta-dcache.service DESTINATION etc/systemd/system)
\ No newline at end of file
/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2021 CERN
* @copyright Copyright(C) 2021 DESY
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "FrontendGRpcSvc.h"
#include "version.h"
Status CtaRpcImpl::Version(::grpc::ServerContext *context, const ::google::protobuf::Empty *request,
::cta::admin::Version *response) {
response->set_cta_version(CTA_VERSION);
response->set_xrootd_ssi_protobuf_interface_version("gPRC-frontend v0.0.1");
return Status::OK;
}
Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveResponse* response) {
m_log->log(cta::log::INFO, "Archive request");
const std::string storageClass = request->file().storageclass();
m_log->log(cta::log::DEBUG, "Archive request for storageClass: " + storageClass);
cta::common::dataStructures::RequesterIdentity requester;
requester.name = request->cli().user().username();
requester.group = request->cli().user().groupname();
auto instance = request->instance().name();
try {
uint64_t archiveFileId = m_scheduler->checkAndGetNextArchiveFileId(instance, storageClass, requester, *m_log);
cta::common::dataStructures::ArchiveRequest archiveRequest;
cta::checksum::ProtobufToChecksumBlob(request->file().csb(), archiveRequest.checksumBlob);
archiveRequest.diskFileInfo.owner_uid = 1;
archiveRequest.diskFileInfo.gid = 1;
archiveRequest.diskFileInfo.path = "";
archiveRequest.diskFileID = request->file().fid();
archiveRequest.fileSize = request->file().size();
archiveRequest.requester.name = request->cli().user().username();
archiveRequest.requester.group = request->cli().user().groupname();
archiveRequest.storageClass = storageClass;
archiveRequest.srcURL = request->transport().dst_url();
archiveRequest.archiveReportURL = request->transport().report_url() + "?archiveid=" + std::to_string(archiveFileId);
archiveRequest.archiveErrorReportURL = request->transport().error_report_url();
archiveRequest.creationLog.host = context->peer();
archiveRequest.creationLog.username = instance;
archiveRequest.creationLog.time = time(nullptr);
std::string reqId = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, *m_log);
m_log->log(cta::log::INFO, "Archive request for storageClass: " + storageClass
+ " archiveFileId: " + std::to_string(archiveFileId)
+ "RequestID: " + reqId);
response->set_fid(archiveFileId);
response->set_reqid(reqId);
} catch (cta::exception::Exception &ex) {
m_log->log(cta::log::CRIT, ex.getMessageValue());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, ex.getMessageValue());
}
return Status::OK;
}
Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty* response) {
m_log->log(cta::log::DEBUG, "Delete request");
auto instance = request->instance().name();
// Unpack message
cta::common::dataStructures::DeleteArchiveRequest deleteRequest;
deleteRequest.requester.name = request->cli().user().username();
deleteRequest.requester.group = request->cli().user().groupname();
deleteRequest.diskFilePath = "";
deleteRequest.diskFileId = request->file().fid();
deleteRequest.diskInstance = instance;
deleteRequest.archiveFileID = request->archiveid();
// Delete the file from the catalogue or from the objectstore if archive request is created
cta::utils::Timer t;
try {
deleteRequest.archiveFile = m_catalogue->getArchiveFileById(deleteRequest.archiveFileID);
} catch (cta::exception::Exception &ex){
// TODO add logging
}
m_scheduler->deleteArchive(instance, deleteRequest, *m_log);
m_log->log(cta::log::INFO, "archive file deleted.");
return Status::OK;
}
Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::cta::dcache::rpc::RetrieveResponse *response) {
const std::string storageClass = request->file().storageclass();
m_log->log(cta::log::DEBUG, "Retrieve request for storageClass: " + storageClass);
auto instance = request->instance().name();
// Unpack message
cta::common::dataStructures::RetrieveRequest retrieveRequest;
retrieveRequest.requester.name = request->cli().user().username();
retrieveRequest.requester.group = request->cli().user().groupname();
retrieveRequest.dstURL = request->transport().dst_url();
retrieveRequest.errorReportURL = request->transport().error_report_url();
retrieveRequest.diskFileInfo.owner_uid = 1;
retrieveRequest.diskFileInfo.gid = 1;
retrieveRequest.diskFileInfo.path = "";
retrieveRequest.creationLog.host = context->peer();
retrieveRequest.creationLog.username = instance;
retrieveRequest.creationLog.time = time(nullptr);
retrieveRequest.isVerifyOnly = false;
retrieveRequest.archiveFileID = request->archiveid();
cta::utils::Timer t;
// Queue the request
std::string reqId = m_scheduler->queueRetrieve(instance, retrieveRequest, *m_log);
m_log->log(cta::log::INFO, "Retrieve request for storageClass: " + storageClass
+ " archiveFileId: " + std::to_string(retrieveRequest.archiveFileID)
+ "RequestID: " + reqId);
response->set_reqid(reqId);
return Status::OK;
}
void CtaRpcImpl::run(const std::string server_address) {
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(this);
std::unique_ptr <Server> server(builder.BuildAndStart());
m_log->log(cta::log::INFO, "Listening on socket address: " + server_address);
server->Wait();
}
CtaRpcImpl::CtaRpcImpl(cta::log::LogContext *lc, std::unique_ptr<cta::catalogue::Catalogue> &catalogue, std::unique_ptr <cta::Scheduler> &scheduler):
m_catalogue(std::move(catalogue)), m_scheduler(std::move(scheduler)) {
m_log = lc;
}
using namespace cta;
using namespace cta::common;
int main(const int argc, char *const *const argv) {
std::unique_ptr <cta::log::Logger> logger = std::unique_ptr<cta::log::Logger>(new log::StdoutLogger("cta-dev", "cta-grpc-frontend", true));
log::LogContext lc(*logger);
// use castor config to avoid dependency on xroot-ssi
Configuration config("/etc/cta/cta.conf");
std::string server_address("0.0.0.0:17017");
// Initialise the Catalogue
std::string catalogueConfigFile = "/etc/cta/cta-catalogue.conf";
const rdbms::Login catalogueLogin = rdbms::Login::parseFile(catalogueConfigFile);
const uint64_t nbArchiveFileListingConns = 2;
auto catalogueFactory = catalogue::CatalogueFactoryFactory::create(*logger, catalogueLogin,
10,
nbArchiveFileListingConns);
auto catalogue = catalogueFactory->create();
try {
catalogue->ping();
lc.log(log::INFO, "Connected to catalog " + catalogue->getSchemaVersion().getSchemaVersion<std::string>());
} catch (cta::exception::Exception &ex) {
lc.log(cta::log::CRIT, ex.getMessageValue());
exit(1);
}
// Initialise the Scheduler
auto backed = config.getConfEntString("ObjectStore", "BackendPath");
lc.log(log::INFO, "Using scheduler backend: " + backed);
auto sInit = cta::make_unique<SchedulerDBInit_t>("Frontend", backed, *logger);
auto scheddb = sInit->getSchedDB(*catalogue, *logger);
scheddb->setBottomHalfQueueSize(25000);
auto scheduler = cta::make_unique<cta::Scheduler>(*catalogue, *scheddb, 5, 2*1000*1000);
CtaRpcImpl svc(&lc, catalogue, scheduler);
svc.run(server_address);
}
#ifndef CTA_FRONTENDGRPCSVC_H
#define CTA_FRONTENDGRPCSVC_H
#include <grpcpp/grpcpp.h>
#include "catalogue/CatalogueFactoryFactory.hpp"
#include "rdbms/Login.hpp"
#include <common/Configuration.hpp>
#include <common/utils/utils.hpp>
#include <scheduler/Scheduler.hpp>
#include <scheduler/OStoreDB/OStoreDBInit.hpp>
#include "common/log/SyslogLogger.hpp"
#include "common/log/StdoutLogger.hpp"
#include "common/log/FileLogger.hpp"
#include "common/log/LogLevel.hpp"
#include <common/checksum/ChecksumBlobSerDeser.hpp>
#include "cta_rpc.grpc.pb.h"
using cta::Scheduler;
using cta::SchedulerDBInit_t;
using cta::SchedulerDB_t;
using cta::catalogue::Catalogue;
using cta::rdbms::Login;
using cta::dcache::rpc::CtaRpc;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
class CtaRpcImpl : public CtaRpc::Service {
private:
std::unique_ptr <cta::catalogue::Catalogue> m_catalogue;
std::unique_ptr <cta::SchedulerDB_t> m_scheddb;
std::unique_ptr <cta::SchedulerDBInit_t> m_scheddb_init;
std::unique_ptr <cta::Scheduler> m_scheduler;
cta::log::LogContext *m_log;
public:
CtaRpcImpl(cta::log::LogContext *lc, std::unique_ptr<cta::catalogue::Catalogue> &catalogue, std::unique_ptr<cta::Scheduler> &scheduler);
void run(const std::string server_address);
Status Version(::grpc::ServerContext *context, const ::google::protobuf::Empty *request, ::cta::admin::Version *response);
Status Archive(::grpc::ServerContext* context, const ::cta::dcache::rpc::ArchiveRequest* request, ::cta::dcache::rpc::ArchiveResponse* response);
Status Retrieve(::grpc::ServerContext* context, const ::cta::dcache::rpc::RetrieveRequest* request, ::cta::dcache::rpc::RetrieveResponse* response);
Status Delete(::grpc::ServerContext* context, const ::cta::dcache::rpc::DeleteRequest* request, ::google::protobuf::Empty* response);
};
#endif //CTA_FRONTENDGRPCSVC_H
[Unit]
Description=dCache frontend for CERN Tape Archive (CTA)
After=syslog.target network-online.target
[Service]
ExecStart=/usr/bin/cta-dcache
Type=simple
Restart=always
User=cta
Group=cta
[Install]
WantedBy=default.target
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.dcache.cta.rpc";
option optimize_for = CODE_SIZE;
package cta.dcache.rpc;
import "google/protobuf/empty.proto";
import "cta_common.proto";
import "cta_admin.proto";
import "cta_eos.proto";
//
// gRPC interface to CTA frontend
//
/*
* File metadata
*/
message FileInfo {
string fid = 1; // disk system unique file ID
uint64 size = 2; // file size
string storageClass = 3; // tape system related storage class (file family)
cta.common.ChecksumBlob csb = 4; // set of knows checksums for the given file
}
/*
* Response to the ARCHIVE request.
*/
message ArchiveResponse {
uint64 fid = 1; // tape system unique file ID
string reqId = 2; // tape request scheduler ID, used to cancel the request
}
/*
* Response to the RETRIEVE request.
*/
message RetrieveResponse {
string reqId = 1; // tape request scheduler ID, used to cancel the request
}
/*
* ARCHIVE request.
*/
message ArchiveRequest {
cta.common.Service instance = 1; // client instance ID
cta.eos.Client cli = 2; // requester information
cta.eos.Transport transport = 3; // IO, error and success endpoints
FileInfo file = 4; // files' metadata
}
/*
* ARCHIVE request.
*/
message RetrieveRequest {
cta.common.Service instance = 1; // client instance ID
cta.eos.Client cli = 2; // requester information
cta.eos.Transport transport = 3; // IO, error and success endpoints
FileInfo file = 4; // files' metadata
uint64 archiveId = 5; // tape system unique file ID
}
message DeleteRequest {
cta.common.Service instance = 1; // client instance ID
cta.eos.Client cli = 2; // requester information
cta.eos.Transport transport = 3; // IO, error and success endpoints
FileInfo file = 4; // files' metadata
uint64 archiveId = 5; // tape system unique file ID
}
service CtaRpc {
rpc Version (google.protobuf.Empty) returns (cta.admin.Version) {}
rpc Archive (ArchiveRequest) returns (ArchiveResponse) {}
rpc Retrieve (RetrieveRequest) returns (RetrieveResponse) {}
rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {}
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment