diff --git a/catalogue/tests/modules/LogicalLibraryCatalogueTest.cpp b/catalogue/tests/modules/LogicalLibraryCatalogueTest.cpp index d2db960d3cf38314846a5de35a1f912ab5b67e63..627cf8075fe65ed62ecfdeafe19eb96ec54582c4 100644 --- a/catalogue/tests/modules/LogicalLibraryCatalogueTest.cpp +++ b/catalogue/tests/modules/LogicalLibraryCatalogueTest.cpp @@ -33,7 +33,7 @@ namespace unitTests { cta_catalogue_LogicalLibraryTest::cta_catalogue_LogicalLibraryTest() : m_dummyLog("dummy", "dummy"), - m_admin("admin", "admin", "admin"), + m_admin("admin", "admin", "admin", ""), m_vo(CatalogueTestUtils::getVo()), m_diskInstance(CatalogueTestUtils::getDiskInstance()), m_mediaType(CatalogueTestUtils::getMediaType()), @@ -748,4 +748,4 @@ TEST_P(cta_catalogue_LogicalLibraryTest, createTape) { } -} // namespace unitTests \ No newline at end of file +} // namespace unitTests diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index 1a9c4553c3feaca2fa04f3a253e01b8d2797fb5d..ce1b9fa003b06a03574ffdeb349ca98a6fcd9b77 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -166,8 +166,8 @@ CtaAdminCmd::CtaAdminCmd(int argc, const char *const *const argv) : { auto &admincmd = *(m_request.mutable_admincmd()); - m_request.set_client_cta_version(CTA_VERSION); - m_request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd.set_client_version(CTA_VERSION); + admincmd.set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); // Strip path from execname diff --git a/cmdline/standalone_cli_tools/CtaSendEvent.cpp b/cmdline/standalone_cli_tools/CtaSendEvent.cpp index c72fc6d981c07780a05329c6c7f5ccd10661062e..ef73f554e5907f021dd5973e495a3aa4e3fd174f 100644 --- a/cmdline/standalone_cli_tools/CtaSendEvent.cpp +++ b/cmdline/standalone_cli_tools/CtaSendEvent.cpp @@ -204,10 +204,6 @@ int exceptionThrowingMain(int argc, char *const *const argv) cta::xrd::Request request; cta::eos::Notification ¬ification = *(request.mutable_notification()); - // Set client version - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); - // Set configuration options XrdSsiPb::Config config(config_file, "cta"); config.set("resource", "/ctafrontend"); diff --git a/cmdline/standalone_cli_tools/CtaVerifyFile.cpp b/cmdline/standalone_cli_tools/CtaVerifyFile.cpp index 8113574bc3f659d3ecefe20df9c1eaba99531cb2..9348edb083195bb393a5227385dbbef49e841ddc 100644 --- a/cmdline/standalone_cli_tools/CtaVerifyFile.cpp +++ b/cmdline/standalone_cli_tools/CtaVerifyFile.cpp @@ -113,10 +113,6 @@ void sendVerifyRequest(const CmdLineArgs &cmdLineArgs, const std::string &archiv cta::xrd::Request request; cta::eos::Notification ¬ification = *(request.mutable_notification()); - // Set client version - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); - // Parse the command line arguments: fill the Notification fields fillNotification(notification, cmdLineArgs, archiveFileId); diff --git a/cmdline/standalone_cli_tools/change_storage_class/ChangeStorageClass.cpp b/cmdline/standalone_cli_tools/change_storage_class/ChangeStorageClass.cpp index 573b49e5c23b5605188a7e537d03e27819d798b1..c73764603816f199999390977451c9c47429d0e9 100644 --- a/cmdline/standalone_cli_tools/change_storage_class/ChangeStorageClass.cpp +++ b/cmdline/standalone_cli_tools/change_storage_class/ChangeStorageClass.cpp @@ -192,8 +192,8 @@ void ChangeStorageClass::storageClassExists() const { cta::xrd::Request request; const auto admincmd = request.mutable_admincmd(); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd->set_client_version(CTA_VERSION); + admincmd->set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd->set_cmd(cta::admin::AdminCmd::CMD_STORAGECLASS); admincmd->set_subcmd(cta::admin::AdminCmd::SUBCMD_LS); @@ -245,8 +245,8 @@ void ChangeStorageClass::updateStorageClassInCatalogue() const { const auto admincmd = request.mutable_admincmd(); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd->set_client_version(CTA_VERSION); + admincmd->set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd->set_cmd(cta::admin::AdminCmd::CMD_ARCHIVEFILE); admincmd->set_subcmd(cta::admin::AdminCmd::SUBCMD_CH); diff --git a/cmdline/standalone_cli_tools/common/CatalogueFetch.cpp b/cmdline/standalone_cli_tools/common/CatalogueFetch.cpp index 160316683975f55b2e63d7e45775cfecc2fb654e..4c94acc479a6b0e406e0da3ea1479307b5a762e6 100644 --- a/cmdline/standalone_cli_tools/common/CatalogueFetch.cpp +++ b/cmdline/standalone_cli_tools/common/CatalogueFetch.cpp @@ -114,8 +114,8 @@ std::tuple<std::string,std::string> CatalogueFetch::getInstanceAndFid(const std: cta::xrd::Request request; auto admincmd = request.mutable_admincmd(); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd->set_client_version(CTA_VERSION); + admincmd->set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd->set_cmd(cta::admin::AdminCmd::CMD_TAPEFILE); admincmd->set_subcmd(cta::admin::AdminCmd::SUBCMD_LS); auto new_opt = admincmd->add_option_uint64(); @@ -142,8 +142,8 @@ bool CatalogueFetch::vidExists(const std::string &vid, std::unique_ptr<XrdSsiPbS cta::xrd::Request request; auto admincmd = request.mutable_admincmd(); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd->set_client_version(CTA_VERSION); + admincmd->set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd->set_cmd(cta::admin::AdminCmd::CMD_TAPE); admincmd->set_subcmd(cta::admin::AdminCmd::SUBCMD_LS); @@ -185,4 +185,4 @@ void CatalogueFetch::handleResponse(const cta::xrd::Request &request, std::uniqu } } // cliTool -} // cta \ No newline at end of file +} // cta diff --git a/cmdline/standalone_cli_tools/eos_namespace_injection/EosNamespaceInjection.cpp b/cmdline/standalone_cli_tools/eos_namespace_injection/EosNamespaceInjection.cpp index 41240f3e7ff7908b80ab30c53cad4bdff1580538..5a3088e7b36ca6d518cac9e4f341477e55e90b1d 100644 --- a/cmdline/standalone_cli_tools/eos_namespace_injection/EosNamespaceInjection.cpp +++ b/cmdline/standalone_cli_tools/eos_namespace_injection/EosNamespaceInjection.cpp @@ -169,8 +169,8 @@ void EosNamespaceInjection::updateFxidAndDiskInstanceInCatalogue(const std::stri const auto admincmd = request.mutable_admincmd(); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd->set_client_version(CTA_VERSION); + admincmd->set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd->set_cmd(cta::admin::AdminCmd::CMD_ARCHIVEFILE); admincmd->set_subcmd(cta::admin::AdminCmd::SUBCMD_CH); @@ -248,8 +248,8 @@ bool EosNamespaceInjection::getMetaDataFromCatalogue(const uint64_t &archiveId) auto &admincmd = *(request.mutable_admincmd()); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd.set_client_version(CTA_VERSION); + admincmd.set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd.set_cmd(cta::admin::AdminCmd::CMD_TAPEFILE); admincmd.set_subcmd(cta::admin::AdminCmd::SUBCMD_LS); @@ -539,4 +539,4 @@ void EosNamespaceInjection::createTxtFileWithSkippedMetadata() const { } } -} // namespace cta::cliTool \ No newline at end of file +} // namespace cta::cliTool diff --git a/cmdline/standalone_cli_tools/restore_files/RestoreFilesCmd.cpp b/cmdline/standalone_cli_tools/restore_files/RestoreFilesCmd.cpp index 96265eb69ad6faa224a987c3597c61d47387538f..5688484cf22d2bcc3cd510efb0a2e9c613a70659 100644 --- a/cmdline/standalone_cli_tools/restore_files/RestoreFilesCmd.cpp +++ b/cmdline/standalone_cli_tools/restore_files/RestoreFilesCmd.cpp @@ -223,8 +223,8 @@ void RestoreFilesCmd::listDeletedFilesCta() const { auto &admincmd = *(request.mutable_admincmd()); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd.set_client_version(CTA_VERSION); + admincmd.set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd.set_cmd(cta::admin::AdminCmd::CMD_RECYCLETAPEFILE); admincmd.set_subcmd(cta::admin::AdminCmd::SUBCMD_LS); @@ -317,8 +317,8 @@ void RestoreFilesCmd::restoreDeletedFileCopyCta(const cta::admin::RecycleTapeFil auto &admincmd = *(request.mutable_admincmd()); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd.set_client_version(CTA_VERSION); + admincmd.set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd.set_cmd(cta::admin::AdminCmd::CMD_RECYCLETAPEFILE); admincmd.set_subcmd(cta::admin::AdminCmd::SUBCMD_RESTORE); @@ -479,8 +479,8 @@ bool RestoreFilesCmd::archiveFileExistsCTA(const uint64_t &archiveFileId) const auto &admincmd = *(request.mutable_admincmd()); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd.set_client_version(CTA_VERSION); + admincmd.set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd.set_cmd(cta::admin::AdminCmd::CMD_TAPEFILE); admincmd.set_subcmd(cta::admin::AdminCmd::SUBCMD_LS); @@ -683,8 +683,8 @@ std::pair<std::string,std::string> RestoreFilesCmd::getInstanceAndFidFromCTA(con cta::xrd::Request request; auto &admincmd = *(request.mutable_admincmd()); - request.set_client_cta_version(CTA_VERSION); - request.set_client_xrootd_ssi_protobuf_interface_version(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); + admincmd.set_client_version(CTA_VERSION); + admincmd.set_protobuf_tag(XROOTD_SSI_PROTOBUF_INTERFACE_VERSION); admincmd.set_cmd(cta::admin::AdminCmd::CMD_TAPEFILE); admincmd.set_subcmd(cta::admin::AdminCmd::SUBCMD_LS); auto new_opt = admincmd.add_option_uint64(); diff --git a/common/checksum/ChecksumBlobSerDeser.hpp b/common/checksum/ChecksumBlobSerDeser.hpp index 84dd76b2974c9cc38cdf8773fedf11a36e1cbc2c..5f37b007a056dd1a3de5c8bc5c939390251c2032 100644 --- a/common/checksum/ChecksumBlobSerDeser.hpp +++ b/common/checksum/ChecksumBlobSerDeser.hpp @@ -23,7 +23,7 @@ namespace cta { namespace checksum { -void ProtobufToChecksumBlob(const common::ChecksumBlob &p_csb, checksum::ChecksumBlob &csb) { +inline void ProtobufToChecksumBlob(const common::ChecksumBlob &p_csb, checksum::ChecksumBlob &csb) { csb.clear(); for(auto &cs : p_csb.cs()) { checksum::ChecksumType type; @@ -40,7 +40,7 @@ void ProtobufToChecksumBlob(const common::ChecksumBlob &p_csb, checksum::Checksu } } -void ChecksumBlobToProtobuf(const checksum::ChecksumBlob &csb, common::ChecksumBlob &p_csb) { +inline void ChecksumBlobToProtobuf(const checksum::ChecksumBlob &csb, common::ChecksumBlob &p_csb) { for(auto &cs : csb.getMap()) { common::ChecksumBlob::Checksum::Type type; switch(cs.first) { diff --git a/common/dataStructures/SecurityIdentity.cpp b/common/dataStructures/SecurityIdentity.cpp index 963e4449dee9e3db1827641bfecbbee790e42fe2..c83ad692eaf476ba292dab9932f0d5d05d0927f8 100644 --- a/common/dataStructures/SecurityIdentity.cpp +++ b/common/dataStructures/SecurityIdentity.cpp @@ -26,20 +26,26 @@ namespace dataStructures { //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -SecurityIdentity::SecurityIdentity() {} +SecurityIdentity::SecurityIdentity() : + authProtocol(Protocol::NONE) {} //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -SecurityIdentity::SecurityIdentity(const std::string& username, const std::string& host): - username(username), host(host) {} - +SecurityIdentity::SecurityIdentity(const std::string& username, const std::string& host) : + username(username), host(host), authProtocol(Protocol::NONE) {} //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -SecurityIdentity::SecurityIdentity(const std::string& username, const std::string& host, const std::string & clientHost): - username(username), host(host), clientHost(clientHost) {} +SecurityIdentity::SecurityIdentity(const std::string& username, const std::string& host, const std::string& clientHost, const std::string& auth) : + username(username), host(host), clientHost(clientHost), authProtocol(Protocol::NONE) { + if(!auth.empty()) { + // Map the client protocol string to enum value + auto proto_it = m_authProtoMap.find(auth); + authProtocol = proto_it != m_authProtoMap.end() ? proto_it->second : Protocol::OTHER; + } +} //------------------------------------------------------------------------------ // operator== diff --git a/common/dataStructures/SecurityIdentity.hpp b/common/dataStructures/SecurityIdentity.hpp index 57271dc23b5b7bd20e6cc7a4483bc20c5cdb0abf..e7cc7c9605a20cbba7117513a0522e441f4aabd4 100644 --- a/common/dataStructures/SecurityIdentity.hpp +++ b/common/dataStructures/SecurityIdentity.hpp @@ -27,16 +27,15 @@ namespace common { namespace dataStructures { /** - * This struct holds the information about who's issued the CTA command and from - * which host + * This struct holds the information about who issued the CTA command and from which host */ struct SecurityIdentity { SecurityIdentity(); - SecurityIdentity(const std::string & username, const std::string & host); + SecurityIdentity(const std::string& username, const std::string& host); - SecurityIdentity(const std::string & username, const std::string & host, const std::string & clientHost); + SecurityIdentity(const std::string& username, const std::string& host, const std::string& clientHost, const std::string& auth); bool operator==(const SecurityIdentity &rhs) const; @@ -44,10 +43,18 @@ struct SecurityIdentity { bool operator<(const SecurityIdentity &rhs) const; + // Security protocol used to connect + enum class Protocol { NONE, SSS, KRB5, GRPC_TOKEN, OTHER }; + const std::map<std::string, Protocol> m_authProtoMap = { + { "sss", Protocol::SSS }, + { "krb5", Protocol::KRB5 }, + { "grpc_token", Protocol::GRPC_TOKEN } + }; + std::string username; std::string host; std::string clientHost; - + Protocol authProtocol; }; // struct SecurityIdentity std::ostream &operator<<(std::ostream &os, const SecurityIdentity &obj); diff --git a/frontend/common/AdminCmd.cpp b/frontend/common/AdminCmd.cpp new file mode 100644 index 0000000000000000000000000000000000000000..45ea0dcb6cb8e80df374e3a22d10e0ce98e11ab1 --- /dev/null +++ b/frontend/common/AdminCmd.cpp @@ -0,0 +1,77 @@ +/* + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2023 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#include "cmdline/CtaAdminCmdParse.hpp" +#include "frontend/common/AdminCmd.hpp" + +namespace cta { +namespace frontend { + +AdminCmd::AdminCmd(const frontend::FrontendService& frontendService, + const common::dataStructures::SecurityIdentity& clientIdentity, + const admin::AdminCmd& adminCmd) : + m_cliIdentity(clientIdentity), + m_catalogue(frontendService.getCatalogue()), + m_scheduler(frontendService.getScheduler()), + m_lc(frontendService.getLogContext()) +{ + // Check that the user is authorized + m_scheduler.authorizeAdmin(m_cliIdentity, m_lc); + + // Validate the Protocol Buffer and import options into maps + importOptions(adminCmd); + m_clientVersion.ctaVersion = adminCmd.client_version(); + m_clientVersion.protobufTag = adminCmd.protobuf_tag(); +} + +xrd::Response AdminCmd::process() { + xrd::Response response; + + return response; +} + +void AdminCmd::importOptions(const admin::AdminCmd& adminCmd) +{ + // Validate the Protocol Buffer + validateCmd(adminCmd); + + // Import Boolean options + for(auto opt_it = adminCmd.option_bool().begin(); opt_it != adminCmd.option_bool().end(); ++opt_it) { + m_option_bool.insert(std::make_pair(opt_it->key(), opt_it->value())); + } + + // Import UInt64 options + for(auto opt_it = adminCmd.option_uint64().begin(); opt_it != adminCmd.option_uint64().end(); ++opt_it) { + m_option_uint64.insert(std::make_pair(opt_it->key(), opt_it->value())); + } + + // Import String options + for(auto opt_it = adminCmd.option_str().begin(); opt_it != adminCmd.option_str().end(); ++opt_it) { + m_option_str.insert(std::make_pair(opt_it->key(), opt_it->value())); + } + + // Import String List options + for(auto opt_it = adminCmd.option_str_list().begin(); opt_it != adminCmd.option_str_list().end(); ++opt_it) { + std::vector<std::string> items; + for(auto item_it = opt_it->item().begin(); item_it != opt_it->item().end(); ++item_it) { + items.push_back(*item_it); + } + m_option_str_list.insert(std::make_pair(opt_it->key(), items)); + } +} + +}} // namespace cta::frontend diff --git a/frontend/common/AdminCmd.hpp b/frontend/common/AdminCmd.hpp new file mode 100644 index 0000000000000000000000000000000000000000..841e6fef6e33053b7cc89a5b004feb69a4f7338f --- /dev/null +++ b/frontend/common/AdminCmd.hpp @@ -0,0 +1,62 @@ +/* + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2023 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#pragma once + + +#include "cta_frontend.pb.h" +#include "frontend/common/Version.hpp" +#include "frontend/common/FrontendService.hpp" + +namespace cta { +namespace frontend { + +class AdminCmd { +public: + AdminCmd(const frontend::FrontendService& frontendService, + const common::dataStructures::SecurityIdentity& clientIdentity, + const admin::AdminCmd& event); + + ~AdminCmd() = default; + + /*! + * Process the admin command + * + * @return Protobuf to return to the client + */ + xrd::Response process(); + +private: + /*! + * Convert protobuf options to maps + */ + void importOptions(const admin::AdminCmd& adminCmd); + + common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username, host, authentication + catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue + cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler + log::LogContext m_lc; //!< CTA Log Context + Version m_clientVersion; //!< CTA client (cta-admin) version + + // Command options extracted from protobuf + std::map<cta::admin::OptionBoolean::Key, bool> m_option_bool; //!< Boolean options + std::map<cta::admin::OptionUInt64::Key, uint64_t> m_option_uint64; //!< UInt64 options + std::map<cta::admin::OptionString::Key, std::string> m_option_str; //!< String options + std::map<cta::admin::OptionStrList::Key, std::vector<std::string>> m_option_str_list; //!< String List options +}; + +}} // namespace cta::frontend diff --git a/frontend/common/FrontendService.hpp b/frontend/common/FrontendService.hpp index ad5e38f2edbb78da51d04f912bb2c529f2b82a1f..775a5b7229943c32bf72d0b96d829ce68dba2b6a 100644 --- a/frontend/common/FrontendService.hpp +++ b/frontend/common/FrontendService.hpp @@ -32,6 +32,8 @@ class FrontendService { public: explicit FrontendService(const std::string& configFilename); + FrontendService(const FrontendService &) = delete; + ~FrontendService() = default; /*! @@ -72,7 +74,7 @@ public: /*! * Get the verification mount policy */ - std::optional<std::string> getVerificationMountPolicy() const { return m_verificationMountPolicy; } + std::string getVerificationMountPolicy() const { return m_verificationMountPolicy; } /*! * Get the endpoints for namespace queries @@ -100,7 +102,7 @@ private: std::string m_catalogue_conn_string; //!< The catalogue connection string (without the password) uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests std::optional<std::string> m_repackBufferURL; //!< The repack buffer URL - std::optional<std::string> m_verificationMountPolicy; //!< The mount policy for verification requests + std::string m_verificationMountPolicy; //!< The mount policy for verification requests cta::NamespaceMap_t m_namespaceMap; //!< Endpoints for namespace queries }; diff --git a/xroot_plugins/Versions.hpp b/frontend/common/Version.hpp similarity index 83% rename from xroot_plugins/Versions.hpp rename to frontend/common/Version.hpp index bf8eea78cc2b60ca1a1ccb96a21ac0d802a51289..aeb98250e66eaef6b44e3d16e6552716dbf4d4a8 100644 --- a/xroot_plugins/Versions.hpp +++ b/frontend/common/Version.hpp @@ -1,6 +1,6 @@ /* * @project The CERN Tape Archive (CTA) - * @copyright Copyright © 2021-2022 CERN + * @copyright Copyright © 2021-2023 CERN * @license This program is free software, distributed under the terms of the GNU General Public * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your @@ -14,23 +14,25 @@ * granted to it by virtue of its status as an Intergovernmental Organization or * submit itself to any jurisdiction. */ + #pragma once -namespace cta { namespace xrd { +namespace cta { +namespace frontend { /** * Structure to hold CTA versions */ - struct Versions { + struct Version { /** * CTA version major.minor */ std::string ctaVersion; /** - * xrootd-ssi-protobuf-interface version (tag) + * xrootd-ssi-protobuf-interface version/tag */ - std::string xrootdSsiProtoIntVersion; + std::string protobufTag; }; -}} \ No newline at end of file +}} // namespace cta::frontend diff --git a/frontend/common/WorkflowEvent.cpp b/frontend/common/WorkflowEvent.cpp new file mode 100644 index 0000000000000000000000000000000000000000..db624de509cb04472a11752c839351d769169efe --- /dev/null +++ b/frontend/common/WorkflowEvent.cpp @@ -0,0 +1,499 @@ +/* + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2023 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#include "common/checksum/ChecksumBlobSerDeser.hpp" +#include "PbException.hpp" +#include "WorkflowEvent.hpp" + +namespace cta { +namespace frontend { + +WorkflowEvent::WorkflowEvent(const frontend::FrontendService& frontendService, + const common::dataStructures::SecurityIdentity& clientIdentity, + const eos::Notification& event) : + m_event(event), + m_cliIdentity(clientIdentity), + m_catalogue(frontendService.getCatalogue()), + m_scheduler(frontendService.getScheduler()), + m_lc(frontendService.getLogContext()), + m_verificationMountPolicy(frontendService.getVerificationMountPolicy()) +{ + // Log event before processing. This corresponds to the entry in WFE.log in EOS. + { + const std::string& eventTypeName = Workflow_EventType_Name(event.wf().event()); + const std::string& eosInstanceName = event.wf().instance().name(); + const std::string& diskFilePath = event.file().lpath(); + const std::string& diskFileId = std::to_string(event.file().fid()); + log::ScopedParamContainer params(m_lc); + params.add("eventType", eventTypeName) + .add("eosInstance", eosInstanceName) + .add("diskFilePath", diskFilePath) + .add("diskFileId", diskFileId); + m_lc.log(log::INFO, "In WorkflowEvent::WorkflowEvent(): received event."); + } + // Validate that instance name in key used to authenticate == instance name in protocol buffer + if(m_cliIdentity.username != event.wf().instance().name()) { + // Special case: + // Allow KRB5 authentication for CLOSEW and PREPARE events, to allow operators to use a command line + // tool to resubmit failed archive or prepare requests. This is NOT permitted for DELETE events as we + // don't want files removed from the catalogue to be left in the disk namespace. + if(m_cliIdentity.authProtocol == common::dataStructures::SecurityIdentity::Protocol::KRB5 && + (event.wf().event() == eos::Workflow::CLOSEW || + event.wf().event() == eos::Workflow::PREPARE)) { + m_scheduler.authorizeAdmin(m_cliIdentity, m_lc); + m_cliIdentity.username = event.wf().instance().name(); + } else { + throw exception::PbException("Instance name \"" + event.wf().instance().name() + + "\" does not match key identifier \"" + m_cliIdentity.username + "\""); + } + } + // Refuse any workflow events for files in /eos/INSTANCE_NAME/proc/ + { + const std::string& longInstanceName = event.wf().instance().name(); + const bool longInstanceNameStartsWithEos = (0 == longInstanceName.find("eos")); + const std::string shortInstanceName = + longInstanceNameStartsWithEos ? longInstanceName.substr(3) : longInstanceName; + if(shortInstanceName.empty()) { + std::ostringstream msg; + msg << "Short instance name is an empty string: instance=" << longInstanceName; + throw exception::PbException(msg.str()); + } + const std::string procFullPath = std::string("/eos/") + shortInstanceName + "/proc/"; + if(event.file().lpath().find(procFullPath) == 0) { + std::ostringstream msg; + msg << "Cannot process a workflow event for a file in " << procFullPath << " instance=" << longInstanceName + << " event=" << Workflow_EventType_Name(event.wf().event()) << " lpath=" << event.file().lpath(); + throw exception::PbException(msg.str()); + } + } +} + +xrd::Response WorkflowEvent::process() { + xrd::Response response; + + switch(m_event.wf().event()) { + using namespace cta::eos; + + case Workflow::OPENW: + processOPENW(m_event, response); + break; + case Workflow::CREATE: + processCREATE(m_event, response); + break; + case Workflow::CLOSEW: + processCLOSEW(m_event, response); + break; + case Workflow::PREPARE: + processPREPARE(m_event, response); + break; + case Workflow::ABORT_PREPARE: + processABORT_PREPARE(m_event, response); + break; + case Workflow::DELETE: + processDELETE(m_event, response); + break; + case Workflow::UPDATE_FID: + processUPDATE_FID(m_event, response); + break; + default: + throw exception::PbException("Workflow event " + Workflow_EventType_Name(m_event.wf().event()) + + " is not implemented."); + } + return response; +} + +void WorkflowEvent::processOPENW(const eos::Notification& event, xrd::Response& response) { + // Create a log entry + log::ScopedParamContainer params(m_lc); + m_lc.log(log::INFO, "In WorkflowEvent::processOPENW(): ignoring OPENW event."); + + // Set response type + response.set_type(xrd::Response::RSP_SUCCESS); +} + +void WorkflowEvent::processCREATE(const eos::Notification& event, xrd::Response& response) { + // Validate received protobuf + checkIsNotEmptyString(event.cli().user().username(), "event.cli.user.username"); + checkIsNotEmptyString(event.cli().user().groupname(), "event.cli.user.groupname"); + + // Unpack message + common::dataStructures::RequesterIdentity requester; + requester.name = event.cli().user().username(); + requester.group = event.cli().user().groupname(); + + auto storageClassItor = event.file().xattr().find("sys.archive.storage_class"); + if(event.file().xattr().end() == storageClassItor) { + // Fall back to old xattr format + storageClassItor = event.file().xattr().find("CTA_StorageClass"); + if(event.file().xattr().end() == storageClassItor) { + throw exception::PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set"); + } + } + const std::string storageClass = storageClassItor->second; + if(storageClass.empty()) { + throw exception::PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string"); + } + + utils::Timer t; + uint64_t archiveFileId; + + // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool. + if(storageClassItor->second == "fail_on_closew_test") { + archiveFileId = std::numeric_limits<uint64_t>::max(); + } else { + archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc); + } + + // Create a log entry + log::ScopedParamContainer params(m_lc); + params.add("diskFileId", std::to_string(event.file().fid())) + .add("diskFilePath", event.file().lpath()) + .add("fileId", archiveFileId) + .add("schedulerTime", t.secs()); + m_lc.log(log::INFO, "In WorkflowEvent::processCREATE(): assigning new archive file ID."); + + // Set ArchiveFileId in xattrs + response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId))); + + // Set the storage class in xattrs + response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass)); + + // Set response type + response.set_type(xrd::Response::RSP_SUCCESS); +} + +void WorkflowEvent::processCLOSEW(const eos::Notification& event, xrd::Response& response) { + // Validate received protobuf + checkIsNotEmptyString(event.cli().user().username(), "event.cli.user.username"); + checkIsNotEmptyString(event.cli().user().groupname(), "event.cli.user.groupname"); + checkIsNotEmptyString(event.file().lpath(), "event.file.lpath"); + checkIsNotEmptyString(event.wf().instance().url(), "event.wf.instance.url"); + checkIsNotEmptyString(event.transport().report_url(), "event.transport.report_url"); + + // Unpack message + const auto storageClassItor = event.file().xattr().find("sys.archive.storage_class"); + if(event.file().xattr().end() == storageClassItor) { + throw exception::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class"); + } + + // For testing: this storage class will always fail + if(storageClassItor->second == "fail_on_closew_test") { + throw exception::UserError("File is in fail_on_closew_test storage class, which always fails."); + } + + auto storageClass = m_catalogue.StorageClass()->getStorageClass(storageClassItor->second); + + // Disallow archival of files above the specified limit + if(storageClass.vo.maxFileSize && event.file().size() > storageClass.vo.maxFileSize) { + throw exception::UserError("Archive request rejected: file size (" + std::to_string(event.file().size()) + + " bytes) exceeds maximum allowed size (" + std::to_string(storageClass.vo.maxFileSize) + " bytes)"); + } + + common::dataStructures::ArchiveRequest request; + checksum::ProtobufToChecksumBlob(event.file().csb(), request.checksumBlob); + request.diskFileInfo.owner_uid = event.file().owner().uid(); + request.diskFileInfo.gid = event.file().owner().gid(); + request.diskFileInfo.path = event.file().lpath(); + request.diskFileID = std::to_string(event.file().fid()); + request.fileSize = event.file().size(); + request.requester.name = event.cli().user().username(); + request.requester.group = event.cli().user().groupname(); + request.srcURL = event.wf().instance().url(); + request.storageClass = storageClassItor->second; + request.archiveReportURL = event.transport().report_url(); + request.archiveErrorReportURL = event.transport().error_report_url(); + request.creationLog.host = m_cliIdentity.host; + request.creationLog.username = m_cliIdentity.username; + request.creationLog.time = time(nullptr); + + log::ScopedParamContainer params(m_lc); + params.add("requesterInstance", event.wf().requester_instance()); + std::string logMessage = "In WorkflowEvent::processCLOSEW(): "; + + // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which + // must be converted to a valid uint64_t + const auto archiveFileIdItor = event.file().xattr().find("sys.archive.file_id"); + if(event.file().xattr().end() == archiveFileIdItor) { + logMessage += "sys.archive.file_id is not present in extended attributes"; + m_lc.log(log::INFO, logMessage); + throw exception::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); + } + const std::string archiveFileIdStr = archiveFileIdItor->second; + uint64_t archiveFileId = 0; + if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) + { + params.add("sys.archive.file_id", archiveFileIdStr); + logMessage += "sys.archive.file_id is not a positive integer"; + m_lc.log(log::INFO, logMessage); + throw exception::PbException("Invalid archiveFileID " + archiveFileIdStr); + } + params.add("fileId", archiveFileId); + + utils::Timer t; + + if(request.fileSize > 0) { + // Queue the request + std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc); + logMessage += "queued file for archive."; + params.add("schedulerTime", t.secs()); + params.add("archiveRequestId", archiveRequestAddr); + + // Add archive request reference to response as an extended attribute + response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr)); + } else { + logMessage += "ignoring zero-length file."; + } + + // Create a log entry + m_lc.log(log::INFO, logMessage); + + // Set response type + response.set_type(xrd::Response::RSP_SUCCESS); +} + +void WorkflowEvent::processPREPARE(const eos::Notification& event, xrd::Response& response) { + // Validate received protobuf + checkIsNotEmptyString(event.cli().user().username(), "event.cli.user.username"); + checkIsNotEmptyString(event.cli().user().groupname(), "event.cli.user.groupname"); + checkIsNotEmptyString(event.file().lpath(), "event.file.lpath"); + checkIsNotEmptyString(event.transport().dst_url(), "event.transport.dst_url"); + + // Unpack message + common::dataStructures::RetrieveRequest request; + request.requester.name = event.cli().user().username(); + request.requester.group = event.cli().user().groupname(); + request.dstURL = event.transport().dst_url(); + request.errorReportURL = event.transport().error_report_url(); + request.diskFileInfo.owner_uid = event.file().owner().uid(); + request.diskFileInfo.gid = event.file().owner().gid(); + request.diskFileInfo.path = event.file().lpath(); + request.creationLog.host = m_cliIdentity.host; + request.creationLog.username = m_cliIdentity.username; + request.creationLog.time = time(nullptr); + request.isVerifyOnly = event.wf().verify_only(); + if (request.isVerifyOnly) { + request.mountPolicy = m_verificationMountPolicy; + } + + // Vid is for tape verification use case (for dual-copy files) so normally is not specified + if(!event.wf().vid().empty()) { + request.vid = event.wf().vid(); + } + + // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be + // converted to a valid uint64_t + auto archiveFileIdItor = event.file().xattr().find("sys.archive.file_id"); + if(event.file().xattr().end() == archiveFileIdItor) { + // Fall back to the old xattr format + archiveFileIdItor = event.file().xattr().find("CTA_ArchiveFileId"); + if(event.file().xattr().end() == archiveFileIdItor) { + throw exception::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); + } + } + const std::string archiveFileIdStr = archiveFileIdItor->second; + if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) + { + throw exception::PbException("Invalid archiveFileID " + archiveFileIdStr); + } + + // Activity value is a string. The parameter might be present or not. + if(event.file().xattr().find("activity") != event.file().xattr().end()) { + request.activity = event.file().xattr().at("activity"); + } + + utils::Timer t; + + // Queue the request + std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc); + + // Create a log entry + log::ScopedParamContainer params(m_lc); + params.add("fileId", request.archiveFileID) + .add("schedulerTime", t.secs()) + .add("isVerifyOnly", request.isVerifyOnly) + .add("retrieveReqId", retrieveReqId); + if(static_cast<bool>(request.activity)) { + params.add("activity", request.activity.value()); + } + m_lc.log(log::INFO, "In WorkflowEvent::processPREPARE(): queued file for retrieve."); + + // Set response type and add retrieve request reference as an extended attribute. + response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId)); + response.set_type(xrd::Response::RSP_SUCCESS); +} + +void WorkflowEvent::processABORT_PREPARE(const eos::Notification& event, xrd::Response& response) { + // Validate received protobuf + checkIsNotEmptyString(event.cli().user().username(), "event.cli.user.username"); + checkIsNotEmptyString(event.cli().user().groupname(), "event.cli.user.groupname"); + + // Unpack message + common::dataStructures::CancelRetrieveRequest request; + request.requester.name = event.cli().user().username(); + request.requester.group = event.cli().user().groupname(); + + // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be + // converted to a valid uint64_t + auto archiveFileIdItor = event.file().xattr().find("sys.archive.file_id"); + if(event.file().xattr().end() == archiveFileIdItor) { + // Fall back to the old xattr format + archiveFileIdItor = event.file().xattr().find("CTA_ArchiveFileId"); + if(event.file().xattr().end() == archiveFileIdItor) { + throw exception::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); + } + } + const std::string archiveFileIdStr = archiveFileIdItor->second; + if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) + { + throw exception::PbException("Invalid archiveFileID " + archiveFileIdStr); + } + + // The request Id should be stored as an extended attribute + const auto retrieveRequestIdItor = event.file().xattr().find("sys.cta.objectstore.id"); + if(event.file().xattr().end() == retrieveRequestIdItor) { + throw exception::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id"); + } + const std::string retrieveRequestId = retrieveRequestIdItor->second; + request.retrieveRequestId = retrieveRequestId; + + // Queue the request + m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc); + + utils::Timer t; + + // Create a log entry + log::ScopedParamContainer params(m_lc); + params.add("fileId", request.archiveFileID) + .add("schedulerTime", t.secs()) + .add("retrieveRequestId", request.retrieveRequestId) + .add("diskFilePath", utils::midEllipsis(request.diskFileInfo.path, 100)); + m_lc.log(log::INFO, "In WorkflowEvent::processABORT_PREPARE(): canceled retrieve request."); + + // Set response type and remove reference to retrieve request in EOS extended attributes. + response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", "")); + response.set_type(xrd::Response::RSP_SUCCESS); +} + +void WorkflowEvent::processDELETE(const eos::Notification& event, xrd::Response& response) { + // Validate received protobuf + checkIsNotEmptyString(event.cli().user().username(), "event.cli.user.username"); + checkIsNotEmptyString(event.cli().user().groupname(), "event.cli.user.groupname"); + checkIsNotEmptyString(event.file().lpath(), "event.file.lpath"); + + // Unpack message + common::dataStructures::DeleteArchiveRequest request; + request.requester.name = event.cli().user().username(); + request.requester.group = event.cli().user().groupname(); + + std::string lpath = event.file().lpath(); + uint64_t diskFileId = event.file().fid(); + request.diskFilePath = lpath; + request.diskFileId = std::to_string(diskFileId); + request.diskInstance = m_cliIdentity.username; + // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which + // must be converted to a valid uint64_t + auto archiveFileIdItor = event.file().xattr().find("sys.archive.file_id"); + if(event.file().xattr().end() == archiveFileIdItor) { + // Fall back to the old xattr format + archiveFileIdItor = event.file().xattr().find("CTA_ArchiveFileId"); + if(event.file().xattr().end() == archiveFileIdItor) { + throw exception::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); + } + } + const std::string archiveFileIdStr = archiveFileIdItor->second; + if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) + { + throw exception::PbException("Invalid archiveFileID " + archiveFileIdStr); + } + + auto archiveRequestAddrItor = event.file().xattr().find("sys.cta.archive.objectstore.id"); + if(archiveRequestAddrItor != event.file().xattr().end()){ + //We have the ArchiveRequest's objectstore address. + std::string objectstoreAddress = archiveRequestAddrItor->second; + if(!objectstoreAddress.empty()){ + request.address = archiveRequestAddrItor->second; + } + } + + // Delete the file from the catalogue or from the objectstore if archive request is created + utils::Timer t; + log::TimingList tl; + try { + request.archiveFile = m_catalogue.ArchiveFile()->getArchiveFileById(request.archiveFileID); + tl.insertAndReset("catalogueGetArchiveFileByIdTime",t); + } catch (exception::Exception &ex){ + log::ScopedParamContainer spc(m_lc); + spc.add("fileId", request.archiveFileID); + m_lc.log(log::DEBUG, "Ignoring request to delete archive file from the catalogue, because it does not exist"); + } + m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc); + tl.insertAndReset("schedulerTime",t); + // Create a log entry + log::ScopedParamContainer params(m_lc); + params.add("fileId", request.archiveFileID) + .add("address", (request.address ? request.address.value() : "null")) + .add("filePath",request.diskFilePath); + tl.addToLog(params); + m_lc.log(log::INFO, "In WorkflowEvent::processDELETE(): archive file deleted."); + + // Set response type + response.set_type(xrd::Response::RSP_SUCCESS); +} + +void WorkflowEvent::processUPDATE_FID(const eos::Notification& event, xrd::Response& response) { + // Validate received protobuf + checkIsNotEmptyString(event.file().lpath(), "event.file.lpath"); + + // Unpack message + const std::string &diskInstance = m_cliIdentity.username; + const std::string &diskFilePath = event.file().lpath(); + const std::string diskFileId = std::to_string(event.file().fid()); + + // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be + // converted to a valid uint64_t + auto archiveFileIdItor = event.file().xattr().find("sys.archive.file_id"); + if(event.file().xattr().end() == archiveFileIdItor) { + // Fall back to the old xattr format + archiveFileIdItor = event.file().xattr().find("CTA_ArchiveFileId"); + if(event.file().xattr().end() == archiveFileIdItor) { + throw exception::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); + } + } + const std::string archiveFileIdStr = archiveFileIdItor->second; + const uint64_t archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10); + if(0 == archiveFileId) { + throw exception::PbException("Invalid archiveFileID " + archiveFileIdStr); + } + + // Update the disk file ID + utils::Timer t; + m_catalogue.ArchiveFile()->updateDiskFileId(archiveFileId, diskInstance, diskFileId); + + // Create a log entry + log::ScopedParamContainer params(m_lc); + params.add("fileId", archiveFileId) + .add("schedulerTime", t.secs()) + .add("diskInstance", diskInstance) + .add("diskFilePath", diskFilePath) + .add("diskFileId", diskFileId); + m_lc.log(log::INFO, "In WorkflowEvent::processUPDATE_FID(): updated disk file ID."); + + // Set response type + response.set_type(xrd::Response::RSP_SUCCESS); +} + +}} // namespace cta::frontend diff --git a/frontend/common/WorkflowEvent.hpp b/frontend/common/WorkflowEvent.hpp new file mode 100644 index 0000000000000000000000000000000000000000..aac877a0f18efca6c719d08898da64be22cf49a0 --- /dev/null +++ b/frontend/common/WorkflowEvent.hpp @@ -0,0 +1,75 @@ +/* + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2023 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#pragma once + +#include "cta_frontend.pb.h" +#include "frontend/common/FrontendService.hpp" + +namespace cta { +namespace frontend { + +class WorkflowEvent { +public: + WorkflowEvent(const frontend::FrontendService& frontendService, + const common::dataStructures::SecurityIdentity& clientIdentity, + const eos::Notification& event); + + ~WorkflowEvent() = default; + + /*! + * Process the Workflow event + * + * @return Protobuf to return to the client + */ + xrd::Response process(); + +private: + /*! + * Handlers for each Workflow event type + * + * Note: The OPENW event is not handled by CTA, as files destined for tape are immutable. + * + * @param[in] event Workflow event and metadata received from client + * @param[out] response Response protobuf to return to client + */ + void processOPENW (const eos::Notification& event, xrd::Response& response); //!< Open for write event + void processCREATE (const eos::Notification& event, xrd::Response& response); //!< New archive file ID event + void processCLOSEW (const eos::Notification& event, xrd::Response& response); //!< Archive file event + void processPREPARE (const eos::Notification& event, xrd::Response& response); //!< Retrieve file event + void processABORT_PREPARE (const eos::Notification& event, xrd::Response& response); //!< Abort retrieve file event + void processDELETE (const eos::Notification& event, xrd::Response& response); //!< Delete file event + void processUPDATE_FID (const eos::Notification& event, xrd::Response& response); //!< Update disk file ID event + + /*! + * Throw an exception for empty protocol buffer strings + */ + void checkIsNotEmptyString(const std::string& value, const std::string& name) const { + if(value.empty()) { + throw exception::PbException("Protocol buffer field " + name + " is an empty string."); + } + } + + eos::Notification m_event; //!< Workflow Event protocol buffer + common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username, host, authentication + catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue + cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler + log::LogContext m_lc; //!< CTA Log Context + std::string m_verificationMountPolicy; //!< Verification mount policy +}; + +}} // namespace cta::frontend diff --git a/xroot_plugins/CMakeLists.txt b/xroot_plugins/CMakeLists.txt index c8b7a40aaf2a37cac6a9d57ca1c9b7e6d63d2795..748c8b5d339eb55dee650e27197d3b34b12d167e 100644 --- a/xroot_plugins/CMakeLists.txt +++ b/xroot_plugins/CMakeLists.txt @@ -38,6 +38,8 @@ add_library(XrdSsiCta MODULE XrdSsiCtaServiceProvider.cpp XrdSsiCtaRequestProc.c ../cmdline/CtaAdminCmdParse.cpp ../frontend/common/Config.cpp ../frontend/common/FrontendService.cpp + ../frontend/common/WorkflowEvent.cpp + ../frontend/common/AdminCmd.cpp ../frontend/common/GrpcClient.cpp ../frontend/common/GrpcEndpoint.cpp) if(CTA_USE_PGSCHED) target_link_libraries(XrdSsiCta ${XROOTD_XRDSSI_LIB} XrdSsiLib XrdSsiPbEosCta ctascheduler ctacommon ctacatalogue diff --git a/xroot_plugins/XrdCtaVersion.hpp b/xroot_plugins/XrdCtaVersion.hpp index 8f6e712a066758e722405bd7e40a9ea1f043f32c..1313128fece3f71a52b414b76de28f409250f4e4 100644 --- a/xroot_plugins/XrdCtaVersion.hpp +++ b/xroot_plugins/XrdCtaVersion.hpp @@ -42,8 +42,8 @@ class VersionStream: public XrdCtaStream{ private: static constexpr const char* const LOG_SUFFIX = "VersionStream"; //!< Identifier for log messages - Versions m_client_versions; - Versions m_server_versions; + frontend::Version m_client_versions; + frontend::Version m_server_versions; std::string m_catalogue_conn_string; std::string m_catalogue_version; bool m_is_upgrading; @@ -73,7 +73,7 @@ VersionStream::VersionStream(const RequestMessage &requestMsg, cta::catalogue::C == catalogue::SchemaVersion::Status::UPGRADING) { XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "VersionStream() constructor"); m_server_versions.ctaVersion = CTA_VERSION; - m_server_versions.xrootdSsiProtoIntVersion = XROOTD_SSI_PROTOBUF_INTERFACE_VERSION; + m_server_versions.protobufTag = XROOTD_SSI_PROTOBUF_INTERFACE_VERSION; } int VersionStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { @@ -82,10 +82,10 @@ int VersionStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { auto version = record.mutable_version_item(); auto client_version = version->mutable_client_version(); client_version->set_cta_version(m_client_versions.ctaVersion); - client_version->set_xrootd_ssi_protobuf_interface_version(m_client_versions.xrootdSsiProtoIntVersion); + client_version->set_xrootd_ssi_protobuf_interface_version(m_client_versions.protobufTag); auto server_version = version->mutable_server_version(); server_version->set_cta_version(m_server_versions.ctaVersion); - server_version->set_xrootd_ssi_protobuf_interface_version(m_server_versions.xrootdSsiProtoIntVersion); + server_version->set_xrootd_ssi_protobuf_interface_version(m_server_versions.protobufTag); version->set_catalogue_connection_string(m_catalogue_conn_string); version->set_catalogue_version(m_catalogue_version); version->set_is_upgrading(m_is_upgrading); diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 66d64a57e6086b8421eb62edbdfff4c30f52de2a..c16a60b9df7ef5be50dd01570899662a8b9d89bb 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -18,7 +18,6 @@ #include <limits> #include <sstream> #include <string> -#include <XrdSsiPbException.hpp> #include "catalogue/CreateMountPolicyAttributes.hpp" #include "catalogue/CreateTapeAttributes.hpp" @@ -51,6 +50,10 @@ #include "XrdCtaVirtualOrganizationLs.hpp" #include "XrdSsiCtaRequestMessage.hpp" +#include "frontend/common/PbException.hpp" +#include "frontend/common/AdminCmd.hpp" +#include "frontend/common/WorkflowEvent.hpp" + namespace cta { namespace xrd { @@ -80,8 +83,8 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons // Validate the Protocol Buffer and import options into maps importOptions(request.admincmd()); - m_client_versions.ctaVersion = request.client_cta_version(); - m_client_versions.xrootdSsiProtoIntVersion = request.client_xrootd_ssi_protobuf_interface_version(); + m_client_versions.ctaVersion = request.admincmd().client_version(); + m_client_versions.protobufTag = request.admincmd().protobuf_tag(); try { // Map the <Cmd, SubCmd> to a method @@ -327,7 +330,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons break; default: - throw XrdSsiPb::PbException("Admin command pair <" + + throw exception::PbException("Admin command pair <" + AdminCmd_Cmd_Name(request.admincmd().cmd()) + ", " + AdminCmd_SubCmd_Name(request.admincmd().subcmd()) + "> is not implemented."); @@ -335,7 +338,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons // Log the admin command logAdminCmd(__FUNCTION__, "success", "", request.admincmd(), t); - } catch(XrdSsiPb::PbException &ex) { + } catch(exception::PbException &ex) { logAdminCmd(__FUNCTION__, "failure", ex.what(), request.admincmd(), t); throw ex; } catch(cta::exception::UserError &ex) { @@ -349,509 +352,23 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons throw ex; } break; - } // end case Request::kAdmincmd - - case Request::kNotification: - // Log event before processing, same log as in WFE.log on eos side - { - const std::string &eventTypeName = Workflow_EventType_Name(request.notification().wf().event()); - const std::string &eosInstanceName = request.notification().wf().instance().name(); - const std::string &diskFilePath = request.notification().file().lpath(); - const std::string &diskFileId = std::to_string(request.notification().file().fid()); - cta::log::ScopedParamContainer params(m_lc); - params.add("eventType", eventTypeName) - .add("eosInstance", eosInstanceName) - .add("diskFilePath", diskFilePath) - .add("diskFileId", diskFileId); - m_lc.log(cta::log::INFO, "In RequestMessage::process(): processing SSI event"); - - } - // Validate that instance name in key used to authenticate matches instance name in Protocol buffer - if(m_cliIdentity.username != request.notification().wf().instance().name()) { - // Special case: allow KRB5 authentication for CLOSEW and PREPARE events, to allow operators - // to use a command line tool to resubmit failed archive or prepare requests. This is NOT - // permitted for DELETE events as we don't want files removed from the catalogue to be left - // in the EOS namespace. - if(m_protocol == Protocol::KRB5 && - (request.notification().wf().event() == cta::eos::Workflow::CLOSEW || - request.notification().wf().event() == cta::eos::Workflow::PREPARE)) { - m_scheduler.authorizeAdmin(m_cliIdentity, m_lc); - m_cliIdentity.username = request.notification().wf().instance().name(); - } else { - throw XrdSsiPb::PbException("Instance name \"" + request.notification().wf().instance().name() + - "\" does not match key identifier \"" + m_cliIdentity.username + "\""); - } - } - - // Refuse any workflow events for files in /eos/INSTANCE_NAME/proc/ - { - const std::string &longInstanceName = request.notification().wf().instance().name(); - const bool longInstanceNameStartsWithEos = 0 == longInstanceName.find("eos"); - const std::string shortInstanceName = - longInstanceNameStartsWithEos ? longInstanceName.substr(3) : longInstanceName; - if(shortInstanceName.empty()) { - std::ostringstream msg; - msg << "Short instance name is an empty string: instance=" << longInstanceName; - throw XrdSsiPb::PbException(msg.str()); - } - const std::string procFullPath = std::string("/eos/") + shortInstanceName + "/proc/"; - if(request.notification().file().lpath().find(procFullPath) == 0) { - std::ostringstream msg; - msg << "Cannot process a workflow event for a file in " << procFullPath << " instance=" << longInstanceName - << " event=" << Workflow_EventType_Name(request.notification().wf().event()) << " lpath=" << - request.notification().file().lpath(); - throw XrdSsiPb::PbException(msg.str()); - } - } - - // Map the Workflow Event to a method - switch(request.notification().wf().event()) { - using namespace cta::eos; - - case Workflow::OPENW: - processOPENW (request.notification(), response); - break; - case Workflow::CREATE: - processCREATE (request.notification(), response); - break; - case Workflow::CLOSEW: - processCLOSEW (request.notification(), response); - break; - case Workflow::PREPARE: - processPREPARE(request.notification(), response); - break; - case Workflow::ABORT_PREPARE: - processABORT_PREPARE(request.notification(), response); - break; - case Workflow::DELETE: - processDELETE (request.notification(), response); - break; - case Workflow::UPDATE_FID: - processUPDATE_FID (request.notification(), response); - break; + } // end case Request::kAdmincmd - default: - throw XrdSsiPb::PbException("Workflow event " + - Workflow_EventType_Name(request.notification().wf().event()) + - " is not implemented."); - } - break; + case Request::kNotification: { + frontend::WorkflowEvent wfe(m_service.getFrontendService(), m_cliIdentity, request.notification()); + response = wfe.process(); + break; + } // end case Request::kNotification case Request::REQUEST_NOT_SET: - throw XrdSsiPb::PbException("Request message has not been set."); + throw exception::PbException("Request message has not been set."); default: - throw XrdSsiPb::PbException("Unrecognized Request message. " + throw exception::PbException("Unrecognized Request message. " "Possible Protocol Buffer version mismatch between client and server."); } } - - -// EOS Workflow commands - -void RequestMessage::processOPENW(const cta::eos::Notification ¬ification, cta::xrd::Response &response) -{ - // Create a log entry - - cta::log::ScopedParamContainer params(m_lc); - m_lc.log(cta::log::INFO, "In RequestMessage::processOPENW(): ignoring OPENW event."); - - // Set response type - - response.set_type(cta::xrd::Response::RSP_SUCCESS); -} - - - -void RequestMessage::processCREATE(const cta::eos::Notification ¬ification, cta::xrd::Response &response) -{ - // Validate received protobuf - checkIsNotEmptyString(notification.cli().user().username(), "notification.cli.user.username"); - checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname"); - - // Unpack message - cta::common::dataStructures::RequesterIdentity requester; - requester.name = notification.cli().user().username(); - requester.group = notification.cli().user().groupname(); - - auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class"); - if(notification.file().xattr().end() == storageClassItor) { - // Fall back to old xattr format - storageClassItor = notification.file().xattr().find("CTA_StorageClass"); - if(notification.file().xattr().end() == storageClassItor) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is not set"); - } - } - const std::string storageClass = storageClassItor->second; - if(storageClass.empty()) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": sys.archive.storage_class extended attribute is set to an empty string"); - } - - cta::utils::Timer t; - uint64_t archiveFileId; - - // For testing, this storage class will always fail on CLOSEW. Allow it to pass CREATE and don't allocate an archive Id from the pool. - if(storageClassItor->second == "fail_on_closew_test") { - archiveFileId = std::numeric_limits<uint64_t>::max(); - } else { - archiveFileId = m_scheduler.checkAndGetNextArchiveFileId(m_cliIdentity.username, storageClass, requester, m_lc); - } - - // Create a log entry - cta::log::ScopedParamContainer params(m_lc); - params.add("diskFileId", std::to_string(notification.file().fid())) - .add("diskFilePath", notification.file().lpath()) - .add("fileId", archiveFileId) - .add("schedulerTime", t.secs()); - m_lc.log(cta::log::INFO, "In RequestMessage::processCREATE(): assigning new archive file ID."); - - // Set ArchiveFileId in xattrs - response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.file_id", std::to_string(archiveFileId))); - - // Set the storage class in xattrs - response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.archive.storage_class", storageClass)); - - // Set response type - response.set_type(cta::xrd::Response::RSP_SUCCESS); -} - - - -void RequestMessage::processCLOSEW(const cta::eos::Notification ¬ification, cta::xrd::Response &response) -{ - // Validate received protobuf - checkIsNotEmptyString(notification.cli().user().username(), "notification.cli.user.username"); - checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname"); - checkIsNotEmptyString(notification.file().lpath(), "notification.file.lpath"); - checkIsNotEmptyString(notification.wf().instance().url(), "notification.wf.instance.url"); - checkIsNotEmptyString(notification.transport().report_url(), "notification.transport.report_url"); - - // Unpack message - const auto storageClassItor = notification.file().xattr().find("sys.archive.storage_class"); - if(notification.file().xattr().end() == storageClassItor) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.storage_class"); - } - - // For testing: this storage class will always fail - if(storageClassItor->second == "fail_on_closew_test") { - throw cta::exception::UserError("File is in fail_on_closew_test storage class, which always fails."); - } - - auto storageClass = m_catalogue.StorageClass()->getStorageClass(storageClassItor->second); - - // Disallow archival of files above the specified limit - if(storageClass.vo.maxFileSize && notification.file().size() > storageClass.vo.maxFileSize) { - throw exception::UserError("Archive request rejected: file size (" + std::to_string(notification.file().size()) + - " bytes) exceeds maximum allowed size (" + std::to_string(storageClass.vo.maxFileSize) + " bytes)"); - } - - cta::common::dataStructures::ArchiveRequest request; - checksum::ProtobufToChecksumBlob(notification.file().csb(), request.checksumBlob); - request.diskFileInfo.owner_uid = notification.file().owner().uid(); - request.diskFileInfo.gid = notification.file().owner().gid(); - request.diskFileInfo.path = notification.file().lpath(); - request.diskFileID = std::to_string(notification.file().fid()); - request.fileSize = notification.file().size(); - request.requester.name = notification.cli().user().username(); - request.requester.group = notification.cli().user().groupname(); - request.srcURL = notification.wf().instance().url(); - request.storageClass = storageClassItor->second; - request.archiveReportURL = notification.transport().report_url(); - request.archiveErrorReportURL = notification.transport().error_report_url(); - request.creationLog.host = m_cliIdentity.host; - request.creationLog.username = m_cliIdentity.username; - request.creationLog.time = time(nullptr); - - cta::log::ScopedParamContainer params(m_lc); - params.add("requesterInstance", notification.wf().requester_instance()); - std::string logMessage = "In RequestMessage::processCLOSEW(): "; - - // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which - // must be converted to a valid uint64_t - const auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id"); - if(notification.file().xattr().end() == archiveFileIdItor) { - logMessage += "sys.archive.file_id is not present in extended attributes"; - m_lc.log(cta::log::INFO, logMessage); - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); - } - const std::string archiveFileIdStr = archiveFileIdItor->second; - uint64_t archiveFileId = 0; - if((archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) - { - params.add("sys.archive.file_id", archiveFileIdStr); - logMessage += "sys.archive.file_id is not a positive integer"; - m_lc.log(cta::log::INFO, logMessage); - throw XrdSsiPb::PbException("Invalid archiveFileID " + archiveFileIdStr); - } - params.add("fileId", archiveFileId); - - cta::utils::Timer t; - - if(request.fileSize > 0) { - // Queue the request - std::string archiveRequestAddr = m_scheduler.queueArchiveWithGivenId(archiveFileId, m_cliIdentity.username, request, m_lc); - logMessage += "queued file for archive."; - params.add("schedulerTime", t.secs()); - params.add("archiveRequestId", archiveRequestAddr); - - // Add archive request reference to response as an extended attribute - response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", archiveRequestAddr)); - } else { - logMessage += "ignoring zero-length file."; - } - - // Create a log entry - m_lc.log(cta::log::INFO, logMessage); - - // Set response type - response.set_type(cta::xrd::Response::RSP_SUCCESS); -} - - -void RequestMessage::processPREPARE(const cta::eos::Notification ¬ification, cta::xrd::Response &response) -{ - // Validate received protobuf - checkIsNotEmptyString(notification.cli().user().username(), "notification.cli.user.username"); - checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname"); - checkIsNotEmptyString(notification.file().lpath(), "notification.file.lpath"); - checkIsNotEmptyString(notification.transport().dst_url(), "notification.transport.dst_url"); - - // Unpack message - cta::common::dataStructures::RetrieveRequest request; - request.requester.name = notification.cli().user().username(); - request.requester.group = notification.cli().user().groupname(); - request.dstURL = notification.transport().dst_url(); - request.errorReportURL = notification.transport().error_report_url(); - request.diskFileInfo.owner_uid = notification.file().owner().uid(); - request.diskFileInfo.gid = notification.file().owner().gid(); - request.diskFileInfo.path = notification.file().lpath(); - request.creationLog.host = m_cliIdentity.host; - request.creationLog.username = m_cliIdentity.username; - request.creationLog.time = time(nullptr); - request.isVerifyOnly = notification.wf().verify_only(); - if (request.isVerifyOnly) { - request.mountPolicy = m_verificationMountPolicy; - } - - // Vid is for tape verification use case (for dual-copy files) so normally is not specified - if(!notification.wf().vid().empty()) { - request.vid = notification.wf().vid(); - } - - // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be - // converted to a valid uint64_t - auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id"); - if(notification.file().xattr().end() == archiveFileIdItor) { - // Fall back to the old xattr format - archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId"); - if(notification.file().xattr().end() == archiveFileIdItor) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); - } - } - const std::string archiveFileIdStr = archiveFileIdItor->second; - if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) - { - throw XrdSsiPb::PbException("Invalid archiveFileID " + archiveFileIdStr); - } - - // Activity value is a string. The parameter might be present or not. - if(notification.file().xattr().find("activity") != notification.file().xattr().end()) { - request.activity = notification.file().xattr().at("activity"); - } - - cta::utils::Timer t; - - // Queue the request - std::string retrieveReqId = m_scheduler.queueRetrieve(m_cliIdentity.username, request, m_lc); - - // Create a log entry - cta::log::ScopedParamContainer params(m_lc); - params.add("fileId", request.archiveFileID) - .add("schedulerTime", t.secs()) - .add("isVerifyOnly", request.isVerifyOnly) - .add("retrieveReqId", retrieveReqId); - if(static_cast<bool>(request.activity)) { - params.add("activity", request.activity.value()); - } - m_lc.log(cta::log::INFO, "In RequestMessage::processPREPARE(): queued file for retrieve."); - - // Set response type and add retrieve request reference as an extended attribute. - response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", retrieveReqId)); - response.set_type(cta::xrd::Response::RSP_SUCCESS); -} - - - -void RequestMessage::processABORT_PREPARE(const cta::eos::Notification ¬ification, cta::xrd::Response &response) -{ - // Validate received protobuf - checkIsNotEmptyString(notification.cli().user().username(), "notification.cli.user.username"); - checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname"); - - // Unpack message - cta::common::dataStructures::CancelRetrieveRequest request; - request.requester.name = notification.cli().user().username(); - request.requester.group = notification.cli().user().groupname(); - - // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be - // converted to a valid uint64_t - auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id"); - if(notification.file().xattr().end() == archiveFileIdItor) { - // Fall back to the old xattr format - archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId"); - if(notification.file().xattr().end() == archiveFileIdItor) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); - } - } - const std::string archiveFileIdStr = archiveFileIdItor->second; - if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) - { - throw XrdSsiPb::PbException("Invalid archiveFileID " + archiveFileIdStr); - } - - // The request Id should be stored as an extended attribute - const auto retrieveRequestIdItor = notification.file().xattr().find("sys.cta.objectstore.id"); - if(notification.file().xattr().end() == retrieveRequestIdItor) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.cta.objectstore.id"); - } - const std::string retrieveRequestId = retrieveRequestIdItor->second; - request.retrieveRequestId = retrieveRequestId; - - // Queue the request - m_scheduler.abortRetrieve(m_cliIdentity.username, request, m_lc); - - cta::utils::Timer t; - - // Create a log entry - cta::log::ScopedParamContainer params(m_lc); - params.add("fileId", request.archiveFileID) - .add("schedulerTime", t.secs()) - .add("retrieveRequestId", request.retrieveRequestId) - .add("diskFilePath", cta::utils::midEllipsis(request.diskFileInfo.path, 100)); - m_lc.log(cta::log::INFO, "In RequestMessage::processABORT_PREPARE(): canceled retrieve request."); - - // Set response type and remove reference to retrieve request in EOS extended attributes. - response.mutable_xattr()->insert(google::protobuf::MapPair<std::string,std::string>("sys.cta.objectstore.id", "")); - response.set_type(cta::xrd::Response::RSP_SUCCESS); -} - - - -void RequestMessage::processDELETE(const cta::eos::Notification ¬ification, cta::xrd::Response &response) -{ - // Validate received protobuf - checkIsNotEmptyString(notification.cli().user().username(), "notification.cli.user.username"); - checkIsNotEmptyString(notification.cli().user().groupname(), "notification.cli.user.groupname"); - checkIsNotEmptyString(notification.file().lpath(), "notification.file.lpath"); - - // Unpack message - cta::common::dataStructures::DeleteArchiveRequest request; - request.requester.name = notification.cli().user().username(); - request.requester.group = notification.cli().user().groupname(); - - std::string lpath = notification.file().lpath(); - uint64_t diskFileId = notification.file().fid(); - request.diskFilePath = lpath; - request.diskFileId = std::to_string(diskFileId); - request.diskInstance = m_cliIdentity.username; - // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which - // must be converted to a valid uint64_t - auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id"); - if(notification.file().xattr().end() == archiveFileIdItor) { - // Fall back to the old xattr format - archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId"); - if(notification.file().xattr().end() == archiveFileIdItor) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); - } - } - const std::string archiveFileIdStr = archiveFileIdItor->second; - if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) - { - throw XrdSsiPb::PbException("Invalid archiveFileID " + archiveFileIdStr); - } - - auto archiveRequestAddrItor = notification.file().xattr().find("sys.cta.archive.objectstore.id"); - if(archiveRequestAddrItor != notification.file().xattr().end()){ - //We have the ArchiveRequest's objectstore address. - std::string objectstoreAddress = archiveRequestAddrItor->second; - if(!objectstoreAddress.empty()){ - request.address = archiveRequestAddrItor->second; - } - } - - // Delete the file from the catalogue or from the objectstore if archive request is created - cta::utils::Timer t; - cta::log::TimingList tl; - try { - request.archiveFile = m_catalogue.ArchiveFile()->getArchiveFileById(request.archiveFileID); - tl.insertAndReset("catalogueGetArchiveFileByIdTime",t); - } catch (cta::exception::Exception &ex){ - log::ScopedParamContainer spc(m_lc); - spc.add("fileId", request.archiveFileID); - m_lc.log(log::DEBUG, "Ignoring request to delete archive file from the catalogue, because it does not exist"); - } - m_scheduler.deleteArchive(m_cliIdentity.username, request, m_lc); - tl.insertAndReset("schedulerTime",t); - // Create a log entry - cta::log::ScopedParamContainer params(m_lc); - params.add("fileId", request.archiveFileID) - .add("address", (request.address ? request.address.value() : "null")) - .add("filePath",request.diskFilePath); - tl.addToLog(params); - m_lc.log(cta::log::INFO, "In RequestMessage::processDELETE(): archive file deleted."); - - // Set response type - response.set_type(cta::xrd::Response::RSP_SUCCESS); -} - - - -void RequestMessage::processUPDATE_FID(const cta::eos::Notification ¬ification, cta::xrd::Response &response) -{ - // Validate received protobuf - checkIsNotEmptyString(notification.file().lpath(), "notification.file.lpath"); - - // Unpack message - const std::string &diskInstance = m_cliIdentity.username; - const std::string &diskFilePath = notification.file().lpath(); - const std::string diskFileId = std::to_string(notification.file().fid()); - - // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which must be - // converted to a valid uint64_t - auto archiveFileIdItor = notification.file().xattr().find("sys.archive.file_id"); - if(notification.file().xattr().end() == archiveFileIdItor) { - // Fall back to the old xattr format - archiveFileIdItor = notification.file().xattr().find("CTA_ArchiveFileId"); - if(notification.file().xattr().end() == archiveFileIdItor) { - throw XrdSsiPb::PbException(std::string(__FUNCTION__) + ": Failed to find the extended attribute named sys.archive.file_id"); - } - } - const std::string archiveFileIdStr = archiveFileIdItor->second; - const uint64_t archiveFileId = strtoul(archiveFileIdStr.c_str(), nullptr, 10); - if(0 == archiveFileId) { - throw XrdSsiPb::PbException("Invalid archiveFileID " + archiveFileIdStr); - } - - // Update the disk file ID - cta::utils::Timer t; - m_catalogue.ArchiveFile()->updateDiskFileId(archiveFileId, diskInstance, diskFileId); - - // Create a log entry - cta::log::ScopedParamContainer params(m_lc); - params.add("fileId", archiveFileId) - .add("schedulerTime", t.secs()) - .add("diskInstance", diskInstance) - .add("diskFilePath", diskFilePath) - .add("diskFileId", diskFileId); - m_lc.log(cta::log::INFO, "In RequestMessage::processUPDATE_FID(): updated disk file ID."); - - // Set response type - response.set_type(cta::xrd::Response::RSP_SUCCESS); -} - - - // Admin commands void RequestMessage::logAdminCmd(const std::string &function, const std::string &status, const std::string &reason, diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index d9380e7275e4fc0551b77aaabc9c7b230a7c7797..fe3dbdfa257d04035b32cdaef00a8c9413dc8088 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -20,7 +20,7 @@ #include <XrdSsi/XrdSsiEntity.hh> #include "common/utils/utils.hpp" -#include "Versions.hpp" +#include "frontend/common/Version.hpp" #include "XrdSsiCtaServiceProvider.hpp" #include "cta_frontend.pb.h" @@ -33,26 +33,18 @@ class RequestMessage { public: RequestMessage(const XrdSsiEntity &client, const XrdSsiCtaServiceProvider *service) : + m_cliIdentity(client.name, cta::utils::getShortHostname(), client.host, client.prot), m_service(*service), m_catalogue(service->getFrontendService().getCatalogue()), m_scheduler(service->getFrontendService().getScheduler()), m_archiveFileMaxSize(service->getFrontendService().getArchiveFileMaxSize()), m_repackBufferURL(service->getFrontendService().getRepackBufferURL()), - m_verificationMountPolicy(service->getFrontendService().getVerificationMountPolicy()), m_namespaceMap(service->getFrontendService().getNamespaceMap()), m_lc(service->getFrontendService().getLogContext()), m_catalogue_conn_string(service->getFrontendService().getCatalogueConnectionString()) - { - m_cliIdentity.username = client.name; - m_cliIdentity.host = cta::utils::getShortHostname(); // Host should be of the machine that executes the command - m_cliIdentity.clientHost = client.host; - - m_lc.pushOrReplace({"user", m_cliIdentity.username + "@" + m_cliIdentity.host}); - - // Map the client protocol string to an enum value - auto proto_it = m_protomap.find(client.prot); - m_protocol = proto_it != m_protomap.end() ? proto_it->second : Protocol::OTHER; - } + { + m_lc.pushOrReplace({"user", m_cliIdentity.username + "@" + m_cliIdentity.host}); + } /*! * Process a Notification request or an Admin command request @@ -78,7 +70,7 @@ public: const bool &getRequired(cta::admin::OptionBoolean::Key key) const { return m_option_bool.at(key); } - const Versions &getClientVersions() const { + const frontend::Version &getClientVersions() const { return m_client_versions; } const std::string &getClientXrdSsiProtoIntVersion() const { @@ -146,20 +138,6 @@ public: } private: - /*! - * Process Notification events - * - * @param[in] notification Notification request message from EOS WFE - * @param[out] response Response message to return to EOS - */ - void processOPENW (const cta::eos::Notification ¬ification, cta::xrd::Response &response); //!< Ignore OPENW event - void processCREATE (const cta::eos::Notification ¬ification, cta::xrd::Response &response); //!< New archive file ID event - void processCLOSEW (const cta::eos::Notification ¬ification, cta::xrd::Response &response); //!< Archive file event - void processPREPARE (const cta::eos::Notification ¬ification, cta::xrd::Response &response); //!< Retrieve file event - void processABORT_PREPARE(const cta::eos::Notification ¬ification, cta::xrd::Response &response); //!< Abort retrieve file event - void processDELETE (const cta::eos::Notification ¬ification, cta::xrd::Response &response); //!< Delete file event - void processUPDATE_FID (const cta::eos::Notification ¬ification, cta::xrd::Response &response); //!< Disk file ID update event - /*! * Process AdminCmd events * @@ -287,32 +265,14 @@ private: */ void importOptions(const cta::admin::AdminCmd &admincmd); - /*! - * Throw an exception for empty protocol buffer strings - */ - void checkIsNotEmptyString(const std::string &value, const std::string &error_txt) { - if(value.empty()) throw XrdSsiPb::PbException("Protocol buffer field " + error_txt + " is an empty string."); - } - - // Security protocol used to connect - - enum class Protocol { SSS, KRB5, OTHER }; - - const std::map<std::string, Protocol> m_protomap = { - { "sss", Protocol::SSS }, - { "krb5", Protocol::KRB5 }, - }; - // Member variables - Protocol m_protocol; //!< The protocol the client used to connect cta::common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username/host const XrdSsiCtaServiceProvider &m_service; //!< Const reference to the XRootD SSI Service cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests std::optional<std::string> m_repackBufferURL; //!< Repack buffer URL - std::optional<std::string> m_verificationMountPolicy; //!< Repack buffer URL NamespaceMap_t m_namespaceMap; //!< Identifiers for namespace queries cta::log::LogContext m_lc; //!< CTA Log Context std::map<cta::admin::OptionBoolean::Key, bool> m_option_bool; //!< Boolean options @@ -320,7 +280,7 @@ private: std::map<cta::admin::OptionString::Key, std::string> m_option_str; //!< String options std::map<cta::admin::OptionStrList::Key, std::vector<std::string>> m_option_str_list; //!< String List options - Versions m_client_versions; //!< Client CTA and xrootd-ssi-proto version(tag) + frontend::Version m_client_versions; //!< Client CTA and xrootd-ssi-proto version(tag) std::string m_client_cta_version; //!< Client CTA Version std::string m_client_xrd_ssi_proto_int_version; //!< Client xrootd-ssi-protobuf-interface version (tag) std::string m_catalogue_conn_string; //!< Server catalogue connection string diff --git a/xrootd-ssi-protobuf-interface b/xrootd-ssi-protobuf-interface index 5b175209dfc5ea6608058214c23eee8a300d69d7..278327577dafb5ca904754413e27e42a0987c78e 160000 --- a/xrootd-ssi-protobuf-interface +++ b/xrootd-ssi-protobuf-interface @@ -1 +1 @@ -Subproject commit 5b175209dfc5ea6608058214c23eee8a300d69d7 +Subproject commit 278327577dafb5ca904754413e27e42a0987c78e