diff --git a/xroot_plugins/CMakeLists.txt b/xroot_plugins/CMakeLists.txt index e99a81f44230bed72dd6067fa60d1bb29b38f30f..73c1f5331fa2215c09dd6521f75d574fb0fb9376 100644 --- a/xroot_plugins/CMakeLists.txt +++ b/xroot_plugins/CMakeLists.txt @@ -49,8 +49,6 @@ endif (OCCI_SUPPORT) # XRootD SSI plugin for CTA Frontend -add_subdirectory(messages) - include_directories(../xroot_ssi_pb) add_library(XrdSsiCta MODULE XrdSsiCtaServiceProvider.cpp XrdSsiCtaRequestProc.cpp XrdSsiCtaRequestMessage.cpp ../cmdline/CtaAdminCmdParse.cpp) diff --git a/xroot_plugins/messages/CMakeLists.txt b/xroot_plugins/messages/CMakeLists.txt deleted file mode 100644 index ec7f55fee1f8265b9867494442bfb1b5c9f9abbe..0000000000000000000000000000000000000000 --- a/xroot_plugins/messages/CMakeLists.txt +++ /dev/null @@ -1,38 +0,0 @@ -# The CERN Tape Archive (CTA) project -# Copyright (C) 2015 CERN -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -cmake_minimum_required (VERSION 2.6) - -find_package(Protobuf3 REQUIRED) - -file(GLOB ProtoFiles "${CMAKE_CURRENT_SOURCE_DIR}/*.proto") -PROTOBUF3_GENERATE_CPP(ProtoSources ProtoHeaders ${ProtoFiles}) -foreach(PROTO_SRC ${ProtoSources}) - set_property(SOURCE ${PROTO_SRC} PROPERTY COMPILE_FLAGS " -Wno-missing-field-initializers -fPIC -Wno-narrowing -Wno-implicit-fallthrough") - - # Add -Wno-narrowing -Wno-implicit-fallthrough compiler flags if using gcc - # version 7 or greater - if (CMAKE_COMPILER_IS_GNUCC) - if (GCC_VERSION VERSION_EQUAL 7 OR GCC_VERSION VERSION_GREATER 7) - set_property(SOURCE ${PROTO_SRC} APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-narrowing -Wno-implicit-fallthrough") - endif (GCC_VERSION VERSION_EQUAL 7 OR GCC_VERSION VERSION_GREATER 7) - endif (CMAKE_COMPILER_IS_GNUCC) -endforeach(PROTO_SRC) -set (CTA_FRONT_END_MESSAGES_SRC_FILES ${ProtoSources}) - -include_directories(${PROTOBUF3_INCLUDE_DIRS}) -add_library (XrdCtaMessages ${CTA_FRONT_END_MESSAGES_SRC_FILES}) - -target_link_libraries (XrdCtaMessages ${PROTOBUF3_LIBRARIES}) diff --git a/xroot_plugins/messages/CtaFrontendApi.hpp b/xroot_plugins/messages/CtaFrontendApi.hpp deleted file mode 100644 index 53a4b55bd79013d1031a9a04692335d9935ee104..0000000000000000000000000000000000000000 --- a/xroot_plugins/messages/CtaFrontendApi.hpp +++ /dev/null @@ -1,41 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief Bind the XRootD SSI transport layer to a set of Google Protocol Buffer definitions - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include "XrdSsiPbServiceClientSide.hpp" //!< XRootD SSI/Protocol Buffer Service, client-side bindings -#include "xroot_plugins/messages/cta_frontend.pb.h" //!< Auto-generated message types from .proto file - -/*! - * Bind the type of the XrdSsiService to the types defined in the .proto file - */ -typedef XrdSsiPb::ServiceClientSide<cta::xrd::Request, //!< XrdSSi Request message type - cta::xrd::Response, //!< XrdSsi Metadata message type - cta::xrd::Data, //!< XrdSsi Data message type - cta::xrd::Alert> //!< XrdSsi Alert message type - XrdSsiPbServiceType; - -/*! - * Bind the type of the XrdSsiRequest to the types defined in the .proto file - */ -typedef XrdSsiPb::Request<cta::xrd::Request, //!< XrdSSi Request message type - cta::xrd::Response, //!< XrdSsi Metadata message type - cta::xrd::Data, //!< XrdSsi Data message type - cta::xrd::Alert> //!< XrdSsi Alert message type - XrdSsiPbRequestType; - diff --git a/xroot_plugins/messages/cta_admin.proto b/xroot_plugins/messages/cta_admin.proto deleted file mode 100644 index 4170f0bc318ba7cadab4c2eca3609be2730a0bb5..0000000000000000000000000000000000000000 --- a/xroot_plugins/messages/cta_admin.proto +++ /dev/null @@ -1,156 +0,0 @@ -// @project The CERN Tape Archive (CTA) -// @brief CTA Admin Command API definition -// @copyright Copyright 2017 CERN -// @license This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -syntax = "proto3"; -package cta.admin; - -import "cta_common.proto"; - -// -// Command Options -// - -message OptionBoolean { - enum Key { - DISABLED = 0; - ENCRYPTED = 1; - FORCE = 2; - FULL = 3; - LBP = 4; - - // hasOption options - ALL = 5; - CHECK_CHECKSUM = 6; - EXTENDED = 7; - SHOW_HEADER = 8; - JUSTEXPAND = 9; - JUSTREPACK = 10; - SUMMARY = 11; - } - - Key key = 1; - bool value = 2; -} - -message OptionUInt64 { - enum Key { - ARCHIVE_FILE_ID = 0; - ARCHIVE_PRIORITY = 1; - RETRIEVE_PRIORITY = 2; - CAPACITY = 3; - COPY_NUMBER = 4; - FIRST_FSEQ = 5; - FILE_SIZE = 6; - LAST_FSEQ = 7; - MAX_DRIVES_ALLOWED = 8; - MIN_ARCHIVE_REQUEST_AGE = 9; - MIN_RETRIEVE_REQUEST_AGE = 10; - NUMBER_OF_FILES = 11; - PARTIAL = 12; - PARTIAL_TAPES_NUMBER = 13; - } - - Key key = 1; - uint64 value = 2; -} - -message OptionString { - enum Key { - COMMENT = 0; - DISKID = 1; - DRIVE = 2; - ENCRYPTION_KEY = 3; - FILENAME = 4; - GROUP = 5; - HOSTNAME = 6; - INPUT = 7; - INSTANCE = 8; - LOGICAL_LIBRARY = 9; - MOUNT_POLICY = 10; - OUTPUT = 11; - OWNER = 12; - PATH = 13; - STORAGE_CLASS = 14; - TAG = 15; - TAPE_POOL = 16; - USERNAME = 17; - VID = 18; - } - - Key key = 1; - string value = 2; -} - -// -// CTA Admin Command API -// - -message AdminCmd { - enum Cmd { - CMD_NONE = 0; - CMD_ADMIN = 1; - CMD_ADMINHOST = 2; - CMD_ARCHIVEFILE = 3; - CMD_ARCHIVEROUTE = 4; - CMD_DRIVE = 5; - CMD_GROUPMOUNTRULE = 6; - CMD_LISTPENDINGARCHIVES = 7; - CMD_LISTPENDINGRETRIEVES = 8; - CMD_LOGICALLIBRARY = 9; - CMD_MOUNTPOLICY = 10; - CMD_REPACK = 11; - CMD_REQUESTERMOUNTRULE = 12; - CMD_SHOWQUEUES = 13; - CMD_SHRINK = 14; - CMD_STORAGECLASS = 15; - CMD_TAPE = 16; - CMD_TAPEPOOL = 17; - CMD_TEST = 18; - CMD_VERIFY = 19; - } - enum SubCmd { - SUBCMD_NONE = 0; - SUBCMD_ADD = 1; - SUBCMD_CH = 2; - SUBCMD_ERR = 3; - SUBCMD_LABEL = 4; - SUBCMD_LS = 5; - SUBCMD_RECLAIM = 6; - SUBCMD_RM = 7; - SUBCMD_UP = 8; - SUBCMD_DOWN = 9; - SUBCMD_READ = 10; - SUBCMD_WRITE = 11; - SUBCMD_WRITE_AUTO = 12; - } - - Cmd cmd = 1; //< The primary command - SubCmd subcmd = 2; //< The secondary command - repeated OptionBoolean option_bool = 3; //< List of boolean options - repeated OptionUInt64 option_uint64 = 4; //< List of integer options - repeated OptionString option_str = 5; //< List of string options -} - -// -// Archive File Ls response stream protocol buffer -// - -message ArchiveFileLsItem { - cta.common.ArchiveFile af = 1; - cta.common.TapeFile tf = 2; - uint64 copy_nb = 3; -} - diff --git a/xroot_plugins/messages/cta_common.proto b/xroot_plugins/messages/cta_common.proto deleted file mode 100644 index 14b2e7cce437aeeae0df49b16f9eacf6a6ea434d..0000000000000000000000000000000000000000 --- a/xroot_plugins/messages/cta_common.proto +++ /dev/null @@ -1,76 +0,0 @@ -// @project The CERN Tape Archive (CTA) -// @brief Common types used by other protocol buffers -// @copyright Copyright 2017 CERN -// @license This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -syntax = "proto3"; -package cta.common; - -// -// Common types -// - -message Clock { - uint64 sec = 1; //< seconds of a clock - uint64 nsec = 2; //< nanoseconds of a clock -} - -message Checksum { - string type = 1; //< checksum type - string value = 2; //< checksum value -} - -message Service { - string name = 1; //< name of the service - string url = 2; //< access url of the service -} - -message Id { - uint64 uid = 1; //< user identity number - string username = 2; //< user name - uint64 gid = 3; //< group identity number - string groupname = 4; //< group name -} - -message Security { - string host = 1; //< client host - string app = 2; //< app string - string name = 3; //< security name - string prot = 4; //< security protocol - string grps = 5; //< security grps -} - -message DiskFileInfo { - string owner = 1; //< Owner of the disk file - string group = 2; //< Group of the disk file - string path = 3; //< Path of the disk file -} - -message ArchiveFile { - uint64 archive_id = 1; //< Archive File ID - string disk_instance = 2; //< Disk instance - string disk_id = 3; //< Disk file ID - uint64 size = 4; //< File size - Checksum cs = 5; //< Checksum - string storage_class = 6; //< Storage Class - DiskFileInfo df = 7; //< Disk File Info - uint64 creation_time = 8; //< Creation Time -} - -message TapeFile { - string vid = 1; //< Volume ID of the tape on which the file has been written - uint64 f_seq = 2; //< The position of the file on tape: File Sequence number - uint64 block_id = 3; //< The position of the file on tape: Logical Block ID -} - diff --git a/xroot_plugins/messages/cta_eos.proto b/xroot_plugins/messages/cta_eos.proto deleted file mode 100644 index c097a671c7b8f40382dc656f004a2a10b06c4200..0000000000000000000000000000000000000000 --- a/xroot_plugins/messages/cta_eos.proto +++ /dev/null @@ -1,89 +0,0 @@ -// @project The CERN Tape Archive (CTA) -// @brief CTA-EOS API definition -// @copyright Copyright 2017 CERN -// @license This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -syntax = "proto3"; -package cta.eos; - -import "cta_common.proto"; - -// -// Messages sent from EOS to CTA Frontend -// - -message Workflow { - enum EventType { - NONE = 0; - OPENR = 1; - OPENW = 2; - CLOSER = 3; - CLOSEW = 4; - DELETE = 5; - PREPARE = 6; - } - EventType event = 1; //< event - string queue = 2; //< queue - string wfname = 3; //< workflow - string vpath = 4; //< vpath - cta.common.Service instance = 5; //< instance information - uint64 timestamp = 6; //< event timestamp -} - -message Client { - cta.common.Id user = 1; //< acting client - cta.common.Security sec = 2; //< client security information -} - -message Transport { - string dst_url = 1; //< transport destination URL - string report_url = 2; //< URL to report successful archiving -} - -message Metadata { - uint64 fid = 1; //< file/container id - uint64 pid = 2; //< parent id - cta.common.Clock ctime = 3; //< change time - cta.common.Clock mtime = 4; //< modification time - cta.common.Clock btime = 5; //< birth time - cta.common.Clock ttime = 6; //< tree modification time - cta.common.Id owner = 7; //< ownership - uint64 size = 8; //< size - cta.common.Checksum cks = 9; //< checksum information - sint32 mode = 10; //< mode - string lpath = 11; //< logical path - map<string, string> xattr = 12; //< xattribute map -}; - -message Notification { - Workflow wf = 1; //< workflow - Client cli = 2; //< client information - Transport transport = 3; //< transport - Metadata file = 4; //< file meta data - Metadata directory = 5; //< directory meta data -} - - - -// -// Messages sent from the Tape Server to EOS -// - -//message Tapereplica { -// enum Status { NONE = 0; OFFTAPE = 1; ONTAPE = 2; ONTAPESAVE = 3; } -// uint64 fid = 1; //< file id -// Status status = 2; //< status for file id -// uint64 size = 3; //< File size as recorded on tape, for cross-check -// Checksum cks = 4; //< File checksum as computer while writing to tape -//} diff --git a/xroot_plugins/messages/cta_frontend.proto b/xroot_plugins/messages/cta_frontend.proto deleted file mode 100644 index 2a643d3962481e3ef1bd05e29e05a5bbd596e7a1..0000000000000000000000000000000000000000 --- a/xroot_plugins/messages/cta_frontend.proto +++ /dev/null @@ -1,74 +0,0 @@ -// @project The CERN Tape Archive (CTA) -// @brief CTA Frontend XRootD API definition -// @copyright Copyright 2017 CERN -// @license This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -syntax = "proto3"; -package cta.xrd; - -import "cta_admin.proto"; -import "cta_eos.proto"; - -// -// Requests sent to the CTA Frontend -// - -message Request { - oneof request { - cta.eos.Notification notification = 1; //< EOS WFE Notification - cta.admin.AdminCmd admincmd = 2; //< CTA Admin Command - } -} - - - -// -// Metadata responses sent by the CTA Frontend -// - -message Response { - enum ResponseType { - RSP_INVALID = 0; //< Response type was not set - RSP_SUCCESS = 1; //< Request is valid and was accepted for processing - RSP_ERR_PROTOBUF = 2; //< Framework error caused by Google Protocol Buffers layer - RSP_ERR_CTA = 3; //< Server error reported by CTA Frontend - RSP_ERR_USER = 4; //< User request is invalid - } - ResponseType type = 1; //< Encode the type of this response - string message_txt = 2; //< Optional response message text -} - - - -// -// Stream/Data responses sent by the CTA Frontend -// - -message Data { - // If there is a requirement for more than one type of streaming/data response, use "oneof" here - cta.admin.ArchiveFileLsItem af_ls_item = 1; -} - - - -// -// Alert Messages -// - -message Alert { - enum Audience { LOG = 0; ENDUSER = 1; } - Audience audience = 1; //< The intended audience of the message - string message_txt = 2; //< Text of the message -} - diff --git a/xroot_ssi_pb/README.md b/xroot_ssi_pb/README.md deleted file mode 100644 index 331f5d48752f5bd9f219491137601aabdd046f0c..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/README.md +++ /dev/null @@ -1,22 +0,0 @@ -# XRootD SSI + Google Protocol Buffers 3 - -This directory contains generic classes which bind Google Protocol Buffer definitions to the -XRootD SSI transport layer. It contains the following files: - -## Client Side - -* **XrdSsiPbServiceClientSide.hpp** : Wraps up the Service factory object with Protocol Buffer integration - and synchronisation between Requests and Responses -* **XrdSsiPbRequest.hpp** : Send Requests and handle Responses - -## Server Side - -* **XrdSsiPbService.hpp** : Defines Service on server side: bind Request to Request Processor and Execute -* **XrdSsiPbRequestProc.hpp** : Process Request and send Response - -## Both Client and Server Side - -* **XrdSsiPbAlert.hpp** : Optional Alerts from Service to Client (_e.g._ log messages) -* **XrdSsiPbException.hpp** : Convert XRootD SSI and Protocol Buffer errors to exceptions -* **XrdSsiPbDebug.hpp** : Protocol Buffer inspection functions (for debugging, not required in production) - diff --git a/xroot_ssi_pb/XrdSsiPbAlert.hpp b/xroot_ssi_pb/XrdSsiPbAlert.hpp deleted file mode 100644 index 0ea43887d87304bce83ba5cf3306ead144a1916e..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbAlert.hpp +++ /dev/null @@ -1,82 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief Class to manage XRootD SSI alerts - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <XrdSsi/XrdSsiRespInfo.hh> -#include "XrdSsiPbException.hpp" - -namespace XrdSsiPb { - -/*! - * Alert message class. - * - * The SSI framework enforces the following rules for Alerts: - * - * 1. Alerts are sent in the order posted - * 2. All outstanding Alerts are sent before the final response is sent - * (i.e. before SetResponse() is called) - * 3. Once a final response is posted, subsequent Alert messages are discarded - * 4. If a request is cancelled, all pending Alerts are discarded - */ - -template<typename AlertType> -class AlertMsg : public XrdSsiRespInfoMsg -{ -public: - AlertMsg(const AlertType &alert) : XrdSsiRespInfoMsg(nullptr, 0) - { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] AlertMsg() constructor" << std::endl; -#endif - - // Serialize the Alert - - if(!alert.SerializeToString(&alert_str)) - { - throw PbException("alert.SerializeToString() failed"); - } - - msgBuf = const_cast<char*>(alert_str.c_str()); - msgLen = alert_str.size(); - } - - ~AlertMsg() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ~AlertMsg() destructor" << std::endl; -#endif - } - - /*! - * Method called by the framework to clean up after the Alert has been sent or discarded - */ - - void RecycleMsg(bool sent=true) { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] AlertMsg::RecycleMsg()" << std::endl; - std::cerr << "[DEBUG] Alert \"" << alert_str << "\" was " << (sent ? "sent." : "not sent.") << std::endl; -#endif - delete this; - } - -private: - std::string alert_str; -}; - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbDebug.hpp b/xroot_ssi_pb/XrdSsiPbDebug.hpp deleted file mode 100644 index f95590d6cdcfedd25a7faf011afe08606d7a8530..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbDebug.hpp +++ /dev/null @@ -1,62 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief Helper functions for debugging protocol buffers - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <iostream> -#include <stdio.h> -#include <google/protobuf/util/json_util.h> - -namespace XrdSsiPb { - -/*! - * Wrapper around google::protobuf::util::MessageToJsonString() which outputs to a stream - */ - -inline void OutputJsonString(std::ostream &os, const google::protobuf::Message *message) -{ - using namespace google::protobuf::util; - - std::string output; - JsonPrintOptions options; - - options.add_whitespace = true; - options.always_print_primitive_fields = true; - - MessageToJsonString(*message, &output, options); - - os << output << std::endl; -} - - - -/*! - * Inspect the contents of a serialized Protocol Buffer - */ - -inline void DumpBuffer(const std::string &buffer) -{ - for(size_t i = 0; i < buffer.size(); ++i) - { - fprintf(stderr, "%02X ", buffer[i]); - } - fprintf(stderr, "\n"); -} - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbException.hpp b/xroot_ssi_pb/XrdSsiPbException.hpp deleted file mode 100644 index 91f978d00e802e2cb5ffb5f60d286b1f84182418..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbException.hpp +++ /dev/null @@ -1,52 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief Classes to convert XRootD SSI/Google Protocol Buffers errors into exceptions - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <stdexcept> -#include <XrdSsi/XrdSsiErrInfo.hh> - -namespace XrdSsiPb { - -/*! - * Framework exception thrown by Google Protocol Buffers layer - */ - -class PbException : public std::runtime_error -{ -public: - PbException(const char *err_msg) : std::runtime_error(err_msg) {} - PbException(const std::string &err_msg) : std::runtime_error(err_msg) {} -}; - - - -/*! - * Framework exception thrown by XRootD/SSI layer - */ - -class XrdSsiException : public std::runtime_error -{ -public: - XrdSsiException(const char *err_msg) : std::runtime_error(err_msg) {} - XrdSsiException(const std::string &err_msg) : std::runtime_error(err_msg) {} - XrdSsiException(const XrdSsiErrInfo &eInfo) : std::runtime_error(eInfo.Get()) {} -}; - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbIStreamBuffer.hpp b/xroot_ssi_pb/XrdSsiPbIStreamBuffer.hpp deleted file mode 100644 index 078406983901dbcb56c3c15b26f541a8f5265a8e..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbIStreamBuffer.hpp +++ /dev/null @@ -1,199 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief Input stream to receive a stream of protocol buffers from the server - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <google/protobuf/io/coded_stream.h> -#include "XrdSsiPbException.hpp" - -namespace XrdSsiPb { - -/*! - * Input Stream Buffer class - * - * This implementation is for a record-based stream. The client should be configured with a XrdSsi stream buffer size - * which is large with respect to the maximum size of DataType. This is mainly for efficiency reasons as there is a - * little extra data copying overhead in handling records which are split across two SSI buffers. Note that there is - * also a hard limit: the record size cannot exceed the buffer size. - * - * The buffer size parameter is set in the constructor to XrdSsiPbServiceClientSide. The size of DataType is the maximum - * encoded size of the DataType protocol buffer on the wire. - * - * If there is a requirement to stream arbitrarily large binary blobs rather than records, this functionality will need - * to be added. See the comments on Request::ProcessResponseData(). - */ -template<typename DataType> -class IStreamBuffer -{ -public: - IStreamBuffer(uint32_t bufsize) : - m_max_msglen(bufsize-sizeof(uint32_t)), - m_split_buffer(std::unique_ptr<uint8_t[]>(new uint8_t[m_max_msglen])), - m_split_buflen(0) - { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] IStreamBuffer() constructor" << std::endl; -#endif - } - - ~IStreamBuffer() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] IStreamBuffer() destructor" << std::endl; -#endif - } - - /*! - * Push a new buffer onto the input stream - * - * NOTE: This method is not reentrant; it is assumed it will be called from the XrdSsi framework - * in single-threaded mode. Each client or client thread must set up its own stream. - * - * @param[in] buf_ptr XRootD SSI stream or data buffer - * @param[in] buf_len Size of buf_ptr - */ - void Push(const char *buf_ptr, int buf_len); - -private: - /*! - * Pop a single record from an input stream and pass it to the client - * - * If the message is split across the boundary between buffers, the partial message is saved and - * the method returns false. - * - * @param[in] msg_len Size of the next Protocol Buffer message on the wire - * @param[in,out] input_stream Protocol Buffer Coded Input Stream object wrapping the XRootD - * SSI buffer - * - * @retval true There are more messages to process on the stream - * @retval false End of the input stream was reached - */ - bool popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream); - - /*! - * Callback to handle stream data - * - * Define a specialised version of this method on the client side to handle a specific type of data stream - */ - void DataCallback(DataType record) const { - throw XrdSsiException("Stream/data payload received, but IStreamBuffer::DataCallback() has not been defined"); - } - - // Member variables - - const uint32_t m_max_msglen; //!< Maximum allowed length of a protobuf on the wire - std::unique_ptr<uint8_t[]> m_split_buffer; //!< Holding buffer for partial messages split across two input buffers - int m_split_buflen; //!< Length of data stored in m_split_buffer -}; - - - -template<typename DataType> -void IStreamBuffer<DataType>::Push(const char *buf_ptr, int buf_len) -{ - google::protobuf::io::CodedInputStream input_stream(reinterpret_cast<const uint8_t*>(buf_ptr), buf_len); - - uint32_t msg_len; - - if(m_split_buflen > 0) { - // Stitch together the saved partial record and the incoming record - - if(m_split_buflen <= sizeof(uint32_t)) { - // The size field is split across the boundary, just copy that one field - - int bytes_to_copy = sizeof(uint32_t) - m_split_buflen; - memcpy(m_split_buffer.get() + m_split_buflen, buf_ptr, bytes_to_copy); - input_stream.Skip(bytes_to_copy); - - google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len); - popRecord(msg_len, input_stream); - } else { - // The payload is split across the boundary, copy the entire record - - google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len); - if(msg_len > m_max_msglen) { - throw XrdSsiException("IStreamBuffer::Push(): Data record size (" + std::to_string(msg_len) + - " bytes) exceeds XRootD SSI buffer size (" + std::to_string(m_max_msglen) + " bytes)"); - } - int bytes_to_copy = msg_len + sizeof(uint32_t) - m_split_buflen; - memcpy(m_split_buffer.get() + m_split_buflen, buf_ptr, bytes_to_copy); - input_stream.Skip(bytes_to_copy); - - google::protobuf::io::CodedInputStream split_stream(reinterpret_cast<const uint8_t*>(m_split_buffer.get() + sizeof(uint32_t)), msg_len); - popRecord(msg_len, split_stream); - } - - m_split_buflen = 0; - } - - // Extract remaining records from the input buffer - - do { - const char *buf_ptr; - - // Get pointer to next record - if(!input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len)) break; - - if(buf_len < static_cast<int>(sizeof(uint32_t))) { - // Size field is split across the boundary, save the partial field and finish - m_split_buflen = buf_len; - memcpy(m_split_buffer.get(), buf_ptr, m_split_buflen); - break; - } - - // Get size of next item on the stream - input_stream.ReadLittleEndian32(&msg_len); - } while(popRecord(msg_len, input_stream)); -} - - - -template<typename DataType> -bool IStreamBuffer<DataType>::popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream) -{ - const char *buf_ptr; - int buf_len; - - if(msg_len > m_max_msglen) { - throw XrdSsiException("IStreamBuffer::popRecord(): Data record size (" + std::to_string(msg_len) + - " bytes) exceeds XRootD SSI buffer size (" + std::to_string(m_max_msglen) + " bytes)"); - } - - // Get pointer to next record - if(!input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len)) buf_len = 0; - - if(buf_len < msg_len) { - // Record payload is split across the boundary, save the partial record - google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(msg_len, m_split_buffer.get()); - memcpy(m_split_buffer.get() + sizeof(uint32_t), buf_ptr, buf_len); - m_split_buflen = buf_len + sizeof(uint32_t); - - return false; - } else { - DataType record; - - record.ParseFromArray(buf_ptr, msg_len); - input_stream.Skip(msg_len); - DataCallback(record); - - // If the message terminates at the end of the buffer, we are done, otherwise keep going - return buf_len != msg_len; - } -} - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbOStreamBuffer.hpp b/xroot_ssi_pb/XrdSsiPbOStreamBuffer.hpp deleted file mode 100644 index 3893bc992523071f6fe19934a19c88a02bf73715..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbOStreamBuffer.hpp +++ /dev/null @@ -1,117 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief Output stream to send a stream of protocol buffers from server to client - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <XrdSsi/XrdSsiStream.hh> - -namespace XrdSsiPb { - -/*! - * Output Stream Buffer class. - * - * This class binds XrdSsiStream::Buffer to the stream interface. - * - * This is a naïve implementation, where memory is allocated and deallocated on every use. It favours - * computation efficiency over memory efficiency: the buffer allocated is twice the hint size, but data - * copying is minimized. - * - * A more performant implementation could be implemented using a buffer pool, using the Recycle() - * method to return buffers to the pool. - */ -template<typename DataType> -class OStreamBuffer : public XrdSsiStream::Buffer -{ -public: - /*! - * Constructor - * - * data is a public member of XrdSsiStream::Buffer. It is an unmanaged char* pointer. We initialize - * it to double the hint size, with the implicit rule that the size of an individual serialized - * record on the wire cannot exceed the hint size. - */ - OStreamBuffer(uint32_t hint_size) : XrdSsiStream::Buffer(new char[hint_size * 2]), - m_hint_size(hint_size), - m_data_ptr(data), - m_data_size(0) { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] OStreamBuffer() constructor" << std::endl; -#endif - } - - ~OStreamBuffer() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] OStreamBuffer() destructor" << std::endl; -#endif - delete[] data; - } - - /*! - * Get the data size - */ - uint32_t Size() const { - return m_data_size; - } - - /*! - * Push a protobuf into the queue - * - * @retval true The buffer has been filled and is ready for sending - * @retval false There is room in the buffer for more records - */ - bool Push(const DataType &record) { - uint32_t bytesize = record.ByteSize(); - - if(m_data_size + bytesize > m_hint_size * 2) { - throw XrdSsiException("OStreamBuffer::Push(): Stream buffer overflow"); - } - - // Write the size of the next record into the buffer - google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(bytesize, reinterpret_cast<google::protobuf::uint8*>(m_data_ptr)); - m_data_ptr += sizeof(uint32_t); - - // Serialize the Protocol Buffer - record.SerializeToArray(m_data_ptr, bytesize); - m_data_ptr += bytesize; - - m_data_size += sizeof(uint32_t) + bytesize; - - return m_data_size >= m_hint_size; - } - -private: - /*! - * Called by the XrdSsi framework when it is finished with the object - */ - virtual void Recycle() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] OStreamBuffer::Recycle()" << std::endl; -#endif - delete this; - } - - // Member variables - - const uint32_t m_hint_size; //!< Requested size of the buffer from the XRootD framework - char *m_data_ptr; //!< Pointer to the raw storage - uint32_t m_data_size; //!< Total size of the buffer on the wire - std::vector<std::pair<uint32_t, DataType>> m_protobuf_q; //!< Queue of protobufs to be serialized -}; - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbRequest.hpp b/xroot_ssi_pb/XrdSsiPbRequest.hpp deleted file mode 100644 index 63065f93f6b14911b7b44be1577cf20ef3cd3351..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbRequest.hpp +++ /dev/null @@ -1,384 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief XRootD SSI Request class - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <future> -#include <XrdSsi/XrdSsiRequest.hh> -#include <XrdSsi/XrdSsiStream.hh> - -#ifdef XRDSSI_DEBUG -#include <XrdSsiPbDebug.hpp> -#endif - -#include "XrdSsiPbIStreamBuffer.hpp" - -namespace XrdSsiPb { - -/*! - * Request Callback class - * - * The client should specialize on this class for the Alert Response type. This permits an arbitrary - * number of Alert messages to be sent before each Response. These can be used for any purpose defined - * by the client and server, for example writing messages to the client log. - */ -template<typename CallbackArg> -class RequestCallback -{ -public: - void operator()(const CallbackArg &arg); -}; - - - -/*! - * Request class - */ -template <typename RequestType, typename MetadataType, typename DataType, typename AlertType> -class Request : public XrdSsiRequest -{ -public: - Request(const RequestType &request, unsigned int response_bufsize, uint16_t request_timeout); - - virtual ~Request() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ~Request destructor" << std::endl; -#endif - } - - /*! - * The implementation of GetRequest() must create request data, save it in some manner, and provide - * it to the framework. - */ - virtual char *GetRequest(int &reqlen) override - { - reqlen = m_request_str.size(); - return const_cast<char*>(m_request_str.c_str()); - } - - // Optionally also define the RelRequestBuffer() method to clean up when the framework no longer - // needs access to the data. The thread used to initiate a request may be the same one used in the - // GetRequest() call. - - virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo) override; - - virtual XrdSsiRequest::PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last) override; - - virtual void Alert(XrdSsiRespInfoMsg &alert_msg) override; - - /*! - * Return the future associated with this object's Metadata promise - */ - std::future<MetadataType> GetMetadataFuture() { return m_metadata_promise.get_future(); } - - /*! - * Return the future associated with this object's Data/Stream promise - */ - std::future<void> GetDataFuture() { return m_data_promise.get_future(); } - -private: - void ProcessResponseMetadata(); - - std::string m_request_str; //!< Request buffer - std::unique_ptr<char[]> m_response_buffer; //!< Pointer to storage for Data responses - char *m_response_bufptr; //!< Pointer to the Response buffer - int m_response_bufsize; //!< Size of the Response buffer - - std::promise<MetadataType> m_metadata_promise; //!< Promise a reply of Metadata type - std::promise<void> m_data_promise; //!< Promise a data or stream response - - IStreamBuffer<DataType> m_istream_buffer; //!< Input stream buffer object - - RequestCallback<AlertType> AlertCallback; //!< Callback for Alert messages -}; - - - -/*! - * Request constructor - */ -template<typename RequestType, typename MetadataType, typename DataType, typename AlertType> -Request<RequestType, MetadataType, DataType, AlertType>:: -Request(const RequestType &request, unsigned int response_bufsize, uint16_t request_timeout) : - m_response_bufsize(response_bufsize), - m_istream_buffer(response_bufsize) -{ -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Request constructor: " - << "Response buffer size = " << m_response_bufsize - << " bytes, request timeout = " << request_timeout << std::endl; -#endif - // Set XRootD request timeout - SetTimeOut(request_timeout); - - // Serialize the Request - if(!request.SerializeToString(&m_request_str)) - { - throw PbException("request.SerializeToString() failed"); - } -} - - - -/*! - * Process Responses from the server - * - * Requests are sent to the server asynchronously via the service object. ProcessResponse() informs - * the Request object on the client side if it completed or failed. - */ -template<typename RequestType, typename MetadataType, typename DataType, typename AlertType> -bool Request<RequestType, MetadataType, DataType, AlertType>::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo) -{ -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ProcessResponse(): response type = " << rInfo.State() << std::endl; -#endif - - try { - switch(rInfo.rType) { - - // Data and Metadata responses - - case XrdSsiRespInfo::isData: - // Process Metadata - ProcessResponseMetadata(); - - // Process Data - if(rInfo.blen > 0) - { - // For Data responses, we need to allocate the buffer to receive the data - m_response_buffer = std::unique_ptr<char[]>(new char[m_response_bufsize]); - m_response_bufptr = m_response_buffer.get(); - - // Process Data Response: copy one chunk of data into the buffer, then call ProcessResponseData() - GetResponseData(m_response_bufptr, m_response_bufsize); - } else { // Response is Metadata-only - - // Return control of the object to the calling thread and delete rInfo - Finished(); - - // It is now safe to delete the Request object (implies that the pointer on the calling side will - // never refer to it again and the destructor of the base class doesn't access any class members) - - delete this; - } - break; - - // Stream response - - case XrdSsiRespInfo::isStream: - // Process Metadata - ProcessResponseMetadata(); - - // For Stream responses, we need to allocate the buffer to receive the data - m_response_buffer = std::unique_ptr<char[]>(new char[m_response_bufsize]); - m_response_bufptr = m_response_buffer.get(); - - // Process Stream Response: copy one chunk of data into the buffer, then call ProcessResponseData() - GetResponseData(m_response_bufptr, m_response_bufsize); - - break; - - // Handle errors in the XRootD framework (e.g. no response from server) - - case XrdSsiRespInfo::isError: throw XrdSsiException(eInfo); - - // To implement detached requests, add another callback type which saves the handle - - case XrdSsiRespInfo::isHandle: throw XrdSsiException("Detached requests are not implemented."); - - // To implement file requests, add another callback type - - case XrdSsiRespInfo::isFile: throw XrdSsiException("File requests are not implemented."); - - // Handle invalid responses - - case XrdSsiRespInfo::isNone: - default: throw XrdSsiException("Invalid Response."); - } - } catch(std::exception &ex) { - // Use the exception to fulfil the promise - - m_metadata_promise.set_exception(std::current_exception()); - - Finished(); - delete this; - } - - return true; -} - - - -/*! - * Process Response Metadata - * - * A Response can (optionally) contain Metadata. This can be used for simple responses (e.g. status - * code, short message) or as the header for large asynchronous data transfers or streaming data. - */ -template<typename RequestType, typename MetadataType, typename DataType, typename AlertType> -void Request<RequestType, MetadataType, DataType, AlertType>::ProcessResponseMetadata() -{ - int metadata_len; - const char *metadata_buffer = GetMetadata(metadata_len); -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ProcessResponseMetadata(): received " << metadata_len << " bytes" << std::endl; -#endif -#if 0 - // Show metadata contents as a string of bytes - std::cerr << "[DEBUG] ProcessResponseMetadata(): "; - for(int i = 0; i < metadata_len; ++i) - { - std::cerr << "<" << static_cast<int>(*(metadata_buffer+i)) << ">"; - } - std::cerr << std::endl; -#endif - - // Deserialize the metadata - - MetadataType metadata; - - if(metadata.ParseFromArray(metadata_buffer, metadata_len)) - { - m_metadata_promise.set_value(metadata); - } - else - { - throw PbException("metadata.ParseFromArray() failed"); - } -} - - - -/*! - * Process Response Data. - * - * Data Responses are implemented as a binary stream, which is received one chunk at a time. - * The chunk size is defined when the Request object is instantiated (see m_response_bufsize). - * - * In this implementation, the data returned in the response buffer is record-based, where each - * record is a protocol buffer of type DataType. The framework ensures that the client application - * receives only complete records. - * - * An alternative implementation would be to return typeless blobs to the client application, - * possibly with the format defined in the metadata. This would make sense for cases where the - * data size is in excess of the chunk size. Currently there is no use case for this but - * it could be added in future if required. A possible implementation would be to use type traits - * on DataType to decide how it should be handled. - * - * ProcessResponseData() is called either by GetResponseData(), or asynchronously at any time for data - * streams. - * - * @retval PRD_Normal The response was accepted for processing - * @retval PRD_Hold The response could not be handled at this time. The callback will be placed - * in a global hold queue and the thread will be released. The client is - * responsible for calling the static method XrdSsiRequest::RestartDataResponse() - * to restart processing responses (in FIFO order). - */ -template<typename RequestType, typename MetadataType, typename DataType, typename AlertType> -XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, DataType, AlertType> - ::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last) -{ -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ProcessResponseData(): received " << response_buflen << " bytes" << std::endl; -#endif - - XrdSsiRequest::PRD_Xeq post_process = XrdSsiRequest::PRD_Normal; - - // The buffer length is set to -1 if an error occurred setting up the response - if(response_buflen == -1) - { - throw XrdSsiException(eInfo); - } - - // The buffer length can be 0 if the response is metadata only - if(response_buflen != 0) - { - // Push stream/data buffer onto the input stream for the client - m_istream_buffer.Push(response_bufptr, response_buflen); - } - - if(is_last) // No more data to come - { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ProcessResponseData(): done" << std::endl; -#endif - // Clean up - - // Set the data promise - m_data_promise.set_value(); - - // If Request objects are uniform, we could re-use them instead of deleting them, to avoid the - // overhead of repeated object creation. This would require a more complex Request factory. For - // now we just delete. - - Finished(); - delete this; - } - else - { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ProcessResponseData(): request more response data" << std::endl; -#endif - // If there is more data, get the next chunk - GetResponseData(m_response_bufptr, m_response_bufsize); - } - - return post_process; -} - - - -/*! - * Deserialize Alert messages and call the Alert callback - */ - -template<typename RequestType, typename MetadataType, typename DataType, typename AlertType> -void Request<RequestType, MetadataType, DataType, AlertType>::Alert(XrdSsiRespInfoMsg &alert_msg) -{ - try - { - // Get the Alert - - int alert_len; - char *alert_buffer = alert_msg.GetMsg(alert_len); - - // Deserialize the Alert - - AlertType alert; - - if(alert.ParseFromArray(alert_buffer, alert_len)) - { - AlertCallback(alert); - } - else - { - throw PbException("alert.ParseFromArray() failed"); - } - } - catch(std::exception &ex) - { - m_metadata_promise.set_exception(std::current_exception()); - } - - // Recycle the message to free memory - - alert_msg.RecycleMsg(); -} - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbRequestProc.hpp b/xroot_ssi_pb/XrdSsiPbRequestProc.hpp deleted file mode 100644 index f54370ab1e6470d04e3c214bb0e5baab22e73f97..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbRequestProc.hpp +++ /dev/null @@ -1,241 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief XRootD SSI Responder class template - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <future> - -#include <XrdSsi/XrdSsiResponder.hh> -#include <XrdSsi/XrdSsiResource.hh> -#include "XrdSsiPbException.hpp" -#include "XrdSsiPbAlert.hpp" - -namespace XrdSsiPb { - -/*! - * Exception Handler class. - * - * This is used to send framework exceptions back to the client. The client should specialize on this - * class for the Response class. - */ - -template<typename MetadataType, typename ExceptionType> -class ExceptionHandler -{ -public: - void operator()(MetadataType &response, const ExceptionType &ex); -}; - - - -/*! - * Request Processing class. - * - * This is an agent object that the Service object creates for each Request that it receives. The Request - * object will be bound to the XrdSsiResponder object via a call to XrdSsiResponder::BindRequest(). Once - * the relationship is established, the XrdSsi framework keeps track of the Request object and manages - * its lifetime. - * - * The XrdSsiResponder class contains the methods needed to interact with the Request object: get the - * Request, release storage, send Alerts, and post a Response. It also knows how to safely interact with - * the Request object, handling asynchronous requests such as cancellation, broken TCP connections, etc. - */ - -template <typename RequestType, typename MetadataType, typename AlertType> -class RequestProc : public XrdSsiResponder -{ -public: - RequestProc(XrdSsiResource &resource) : - m_resource(resource), - m_response_stream_ptr(nullptr) { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] RequestProc() constructor" << std::endl; -#endif - } - virtual ~RequestProc() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ~RequestProc() destructor" << std::endl; -#endif - } - - void Execute(); - virtual void Finished(XrdSsiRequest &rqstR, const XrdSsiRespInfo &rInfo, bool cancel=false) override; - -private: - /*! - * Encapsulate the Alert protocol buffer inside a XrdSsiRespInfoMsg object. - * - * Alert message objects are created on the heap with lifetime managed by the XrdSsiResponder class. - */ - - void Alert(const AlertType &alert) - { - XrdSsiResponder::Alert(*(new AlertMsg<AlertType>(alert))); - } - - /*! - * Handle bad protocol buffer Requests. - * - * This class should store the exception in the Response Protocol Buffer, which the framework will - * send back to the client. The client needs to define the specialized version of this class. - */ - - ExceptionHandler<MetadataType, PbException> Throw; - - /*! - * Execute action after deserialization of the Request Protocol Buffer. - * - * The client needs to define the specialized version of this method. - */ - - void ExecuteAction() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Called default RequestProc::ExecuteAction()" << std::endl; -#endif - } - - /* - * Member variables - */ - - const XrdSsiResource &m_resource; //!< Resource associated with the Request - std::promise<void> m_promise; //!< Promise that the Request has been processed - - /* - * Protocol Buffer members - * - * The Serialized Metadata Response buffer needs to be a member variable as it must stay in scope - * after calling RequestProc(), until Finished() is called. - * - * The maximum amount of metadata that may be sent is defined by XrdSsiResponder::MaxMetaDataSZ - * constant member. - */ - - RequestType m_request; //!< Request object - MetadataType m_metadata; //!< Metadata Response object - std::string m_metadata_str; //!< Serialized Metadata Response buffer - std::string m_response_str; //!< Serialized Data Response buffer - XrdSsiStream *m_response_stream_ptr; //!< Stream Response pointer -}; - - - -template <typename RequestType, typename MetadataType, typename AlertType> -void RequestProc<RequestType, MetadataType, AlertType>::Execute() -{ -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] RequestProc::Execute()" << std::endl; -#endif - - // Deserialize the Request - - int request_len; - const char *request_buffer = GetRequest(request_len); - - if(m_request.ParseFromArray(request_buffer, request_len)) - { - // Pass control from the framework to the application - - ExecuteAction(); - } - else - { - // Pass an exception back to the client and continue processing - - Throw(m_metadata, PbException("m_request.ParseFromArray() failed")); - } - - // Release the request buffer - - ReleaseRequestBuffer(); - - // Serialize and send the Metadata - - if(!m_metadata.SerializeToString(&m_metadata_str)) - { - throw PbException("m_metadata.SerializeToString() failed"); - } - SetMetadata(m_metadata_str.c_str(), m_metadata_str.size()); - - // Send the Response - - if(m_response_stream_ptr != nullptr) - { - // Stream Response - - SetResponse(m_response_stream_ptr); - } - else if(m_response_str.size() != 0) - { - // Data Response - - SetResponse(m_response_str.c_str(), m_response_str.size()); - } - else - { - // Metadata-only Response - // - // It is necessary to set a Response even for empty responses, otherwise Finished() - // will not be called on the Request. - - SetNilResponse(); - } - - // Wait for the framework to call Finished() - - auto finished = m_promise.get_future(); - - finished.wait(); -} - - - -/*! - * Clean up the Request Processing object. - * - * This is called when the Request has been processed or cancelled. - * - * If required, you can create specialized versions of this method to handle cancellation/cleanup for - * specific message types. - */ - -template <typename RequestType, typename MetadataType, typename AlertType> -void RequestProc<RequestType, MetadataType, AlertType>::Finished(XrdSsiRequest &rqstR, const XrdSsiRespInfo &rInfo, bool cancel) -{ -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] RequestProc::Finished()" << std::endl; -#endif - - if(cancel) - { - // Reclaim resources dedicated to the request and tell caller the request object can be reclaimed -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Request timed out or was cancelled" << std::endl; -#endif - } - else - { - // Reclaim any allocated resources - } - - // Tell Execute() that we have Finished() - m_promise.set_value(); -} - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbService.hpp b/xroot_ssi_pb/XrdSsiPbService.hpp deleted file mode 100644 index c3b6894c56da319ea37261b45ec3fee99803c92e..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbService.hpp +++ /dev/null @@ -1,156 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief XRootD SSI server-side Service object management - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <XrdSsi/XrdSsiService.hh> -#include <XrdSsi/XrdSsiEntity.hh> -#include "XrdSsiPbRequestProc.hpp" - -namespace XrdSsiPb { - -/*! - * Service Object. - * - * Obtained using GetService() method of the XrdSsiPbServiceProvider factory - */ -template <typename RequestType, typename MetadataType, typename AlertType> -class Service : public XrdSsiService -{ -public: - Service() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Service() constructor" << std::endl; -#endif - } - virtual ~Service() { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ~Service() destructor" << std::endl; -#endif - } - - /*! - * Stop the Service. - * - * Requires some method to pass Finished(true) to in-flight Requests. This has been raised with Andy. - * Currently not implemented as it would require tracking all in-flight Requests by the application - * when this is really the job of the framework. - */ - virtual bool Stop() override - { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Service::Stop()" << std::endl; -#endif - return false; - } - - virtual void ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef) override; - - /*! - * Perform Request pre-authorisation and/or resource optimisation. - */ - virtual bool Prepare(XrdSsiErrInfo &eInfo, const XrdSsiResource &resource) override - { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Service::Prepare():" << std::endl; - std::cerr << "[DEBUG] Resource name: " << resource.rName << std::endl - << "[DEBUG] Resource user: " << resource.rUser << std::endl - << "[DEBUG] Resource info: " << resource.rInfo << std::endl - << "[DEBUG] Hosts to avoid: " << resource.hAvoid << std::endl - << "[DEBUG] Affinity: "; - - switch(resource.affinity) - { - case XrdSsiResource::None: std::cerr << "None" << std::endl; break; - case XrdSsiResource::Default: std::cerr << "Default" << std::endl; break; - case XrdSsiResource::Weak: std::cerr << "Weak" << std::endl; break; - case XrdSsiResource::Strong: std::cerr << "Strong" << std::endl; break; - case XrdSsiResource::Strict: std::cerr << "Strict" << std::endl; break; - } - - std::cerr << "[DEBUG] Resource options: " - << (resource.rOpts & XrdSsiResource::Reusable ? "Resuable " : "") - << (resource.rOpts & XrdSsiResource::Discard ? "Discard" : "") - << std::endl; -#endif - if(resource.client == nullptr || resource.client->name == nullptr) - { - eInfo.Set("Service::Prepare(): XRootD client name is not set. " - "Possible misconfiguration of the KRB5 or SSS keyfile.", EACCES); - return false; - } - -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Resource client name: " << resource.client->name << std::endl; -#endif - - return true; - } - - /*! - * Receive notification that a Request has been attached. - * - * This is required only if the service needs to make decisions on how to run a request based on whether - * it is attached or detached. See Sect. 3.3.1 "Detached Requests" in the XRootD SSI documentation. - */ - virtual bool Attach(XrdSsiErrInfo &eInfo, const std::string &handle, - XrdSsiRequest &reqRef, XrdSsiResource *resp) override - { -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] Service::Attach()" << std::endl; -#endif - return true; - } -}; - - - -/*! - * Bind a Request to a Request Processor and execute the Processor. - * - * The client calls its ProcessRequest() method to hand off its Request and Resource objects. The client's - * Request and Resource objects are transmitted to the server and passed into the service's ProcessRequest() - * method. - */ -template <typename RequestType, typename MetadataType, typename AlertType> -void Service<RequestType, MetadataType, AlertType>::ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef) -{ - XrdSsiPb::RequestProc<RequestType, MetadataType, AlertType> processor(resRef); - - // Bind the processor to the request. Inherits the BindRequest method from XrdSsiResponder. - -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] XrdSsiPbService::ProcessRequest(): Binding Processor to Request" << std::endl; -#endif - processor.BindRequest(reqRef); - - // Execute the request, upon return the processor is deleted - - processor.Execute(); - - // Tell the framework we have finished with the request object: unbind the request from the responder - -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] XrdSsiPbService::ProcessRequest(): Unbinding Processor from Request" << std::endl; -#endif - - processor.UnBindRequest(); -} - -} // namespace XrdSsiPb - diff --git a/xroot_ssi_pb/XrdSsiPbServiceClientSide.hpp b/xroot_ssi_pb/XrdSsiPbServiceClientSide.hpp deleted file mode 100644 index 888881b21638ba4d9d14ba1a376af30aed2a6725..0000000000000000000000000000000000000000 --- a/xroot_ssi_pb/XrdSsiPbServiceClientSide.hpp +++ /dev/null @@ -1,176 +0,0 @@ -/*! - * @project The CERN Tape Archive (CTA) - * @brief XRootD SSI client-side Service object management - * @copyright Copyright 2017 CERN - * @license This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef XRDSSI_DEBUG -#include <iostream> -#endif - -#include <XrdSsi/XrdSsiProvider.hh> -#include <XrdSsi/XrdSsiService.hh> -#include "XrdSsiPbException.hpp" -#include "XrdSsiPbRequest.hpp" - - - -/*! - * XrdSsiProviderClient is instantiated and managed by the SSI library - */ -extern XrdSsiProvider *XrdSsiProviderClient; - - - -namespace XrdSsiPb { - -// Constants - -const unsigned int DefaultResponseBufferSize = 1024; //!< Default size for the response buffer in bytes -const unsigned int DefaultRequestTimeout = 15; //!< Default XRootD request timeout in secs -// Better to get the default from XRD_REQUESTTIMEOUT if it is not set explicitly? -// Decide on order of precedence for setting request timeout - - - -/*! - * Convenience object to manage the XRootD SSI service on the client side - */ -template <typename RequestType, typename MetadataType, typename DataType, typename AlertType> -class ServiceClientSide -{ -public: - ServiceClientSide(const std::string &endpoint, const std::string &resource, - unsigned int response_bufsize = DefaultResponseBufferSize, - unsigned int request_tmo = DefaultRequestTimeout); - - virtual ~ServiceClientSide(); - - std::future<void> Send(const RequestType &request, MetadataType &response); - -private: - XrdSsiResource m_resource; //!< Requests are bound to this resource. As the resource is - //!< reusable, the lifetime of the resource is the same as the - //!< lifetime of the Service object. - - XrdSsiService *m_server_ptr; //!< Pointer to XRootD Server object - - unsigned int m_response_bufsize; //!< Buffer size for responses from the XRootD SSI server - unsigned int m_request_tmo; //!< Timeout for a response from the server -}; - - - -/*! - * Client-side Service Constructor - */ -template <typename RequestType, typename MetadataType, typename DataType, typename AlertType> -ServiceClientSide<RequestType, MetadataType, DataType, AlertType>:: -ServiceClientSide(const std::string &endpoint, const std::string &resource, - unsigned int response_bufsize, unsigned int request_tmo) : - m_resource(resource), - m_response_bufsize(response_bufsize), - m_request_tmo(request_tmo) -{ -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ServiceClientSide() constructor" << std::endl; -#endif - XrdSsiErrInfo eInfo; - - // Set Resource options - - m_resource.rOpts = XrdSsiResource::Reusable; //< Resource context may be cached and is reusable - - // Other possibly-useful options: - // - // (For specifying the tapeserver callback) - // - // XrdSsiResource::rInfo - // This option allows you to send additional out-of-band information to the server that will be executing - // the request. The information should be specified in CGI format (i.e. key=value[&key=value[...]]). This - // information is supplied to the server-side service in its corresponding request resource object. Note - // that restrictions apply for reusable resources. - // - // XrdSsiResource::rUser - // This is an arbitrary string that is meant to further identify the request. The SSI framework normally - // uses this information to tag log messages. It is also supplied to the server-side service in its - // corresponding request resource object. - - // Get the Service pointer - - if(!(m_server_ptr = XrdSsiProviderClient->GetService(eInfo, endpoint))) - { - throw XrdSsiException(eInfo); - } -} - - - -/*! - * Client-side Service Destructor - */ -template <typename RequestType, typename MetadataType, typename DataType, typename AlertType> -ServiceClientSide<RequestType, MetadataType, DataType, AlertType>::~ServiceClientSide() -{ -#ifdef XRDSSI_DEBUG - std::cerr << "[DEBUG] ServiceClientSide() destructor" << std::endl; - - if(!m_server_ptr->Stop()) - { - // As there is no way to get a list of in-flight Requests from the Service, the current Stop() - // implementation simply returns false. i.e. there is no way to know if we are shutting down - // cleanly or not. - - std::cerr << "[WARNING] ServiceClientSide object was destroyed before shutting down the Service." << std::endl; - } -#endif -} - - - -/*! - * Send a Request to the Service - * - * @param[in] request - * @param[out] response - * - * @returns future for Data/Stream requests. This return value can be ignored for Metadata-only Responses. - */ -template <typename RequestType, typename MetadataType, typename DataType, typename AlertType> -std::future<void> ServiceClientSide<RequestType, MetadataType, DataType, AlertType>::Send(const RequestType &request, MetadataType &response) -{ - // Instantiate the Request object - auto request_ptr = new Request<RequestType, MetadataType, DataType, AlertType>(request, m_response_bufsize, m_request_tmo); - auto metadata_future = request_ptr->GetMetadataFuture(); - auto data_future = request_ptr->GetDataFuture(); - - // Transfer ownership of the Request to the Service object. - m_server_ptr->ProcessRequest(*request_ptr, m_resource); - - // After ProcessRequest() returns, it is safe for request_ptr to go out-of-scope, as the framework - // will handle deletion of the Request object. It is also safe to delete the Resource; in our case - // we do not need to as we created a reusable Resource. - - // Wait synchronously for the framework to return its Response (or an exception) - response = metadata_future.get(); - - // Return the future for Data/Stream requests - return data_future; -} - -} // namespace XrdSsiPb -