Skip to content
Snippets Groups Projects
Commit efe7574b authored by Michael Davis's avatar Michael Davis
Browse files

[XrdSsi] Integrates EosCtaStub.cpp with XrdSsiPbServiceClientSide.h

parent 55ef03d1
No related branches found
No related tags found
No related merge requests found
......@@ -27,3 +27,17 @@ install (TARGETS cta DESTINATION usr/bin)
INSTALL (FILES cta-cli.conf DESTINATION ${CMAKE_INSTALL_SYSCONFDIR}/cta)
include_directories (${CMAKE_SOURCE_DIR}/tapeserver/)
#
# eosctastub is a drop-in replacement for "cta archive|retrieve|delete"
#
# XRootD must be compiled with the SSI extensions
find_package(xrootd REQUIRED)
find_package(Protobuf3 REQUIRED)
include_directories(${PROTOBUF3_INCLUDE_DIRS} ${XROOTD_INCLUDE_DIR} ../xroot_ssi_pb)
add_executable(eoscta_stub EosCtaStub.cpp)
target_link_libraries (eoscta_stub ctaeosmessages ${PROTOBUF3_LIBRARIES})
#install(TARGETS eoscta_stub DESTINATION usr/bin)
// Bind the XRootD SSI transport layer to a set of Google Protocol Buffer definitions
#ifndef __EOS_CTA_API_H
#define __EOS_CTA_API_H
#include "XrdSsiPbServiceClientSide.h" // XRootD SSI/Protocol Buffer Service bindings (client side)
#include "eos/messages/eos_messages.pb.h" // Auto-generated message types from .proto file
#if 0
// Bind the type of the XrdSsiService to the types defined in the .proto file
typedef XrdSsiPbServiceClientSide<xrdssi::test::Request, // Request message type
xrdssi::test::Result, // Response message type
xrdssi::test::Metadata, // Metadata message type
xrdssi::test::Alert> // Alert message type
XrdSsiPbServiceType;
#endif
#endif
/*!
*
* @project The CERN Tape Archive (CTA)
* @brief Command-line tool to test EOS-CTA interface
* @description Proof-of-concept stub to combine Google Protocol Buffers and XRootD SSI transport
* @copyright Copyright 2017 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include <stdexcept>
#include <google/protobuf/util/json_util.h>
#include "EosCtaApi.h"
#include "common/dataStructures/FrontendReturnCode.hpp"
const std::runtime_error Usage("Usage: eoscta_stub archive|retrieve|delete [options] [--stderr]");
/*!
* Convert protocol buffer to JSON (for debug output)
*
* @param message A Google protocol buffer
* @returns The PB converted to JSON format
*/
static std::string MessageToJsonString(const google::protobuf::Message &message)
{
using namespace google::protobuf::util;
std::string output;
JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
MessageToJsonString(message, &output, options); // returns util::Status
return output;
}
/*!
* Returns true if --stderr is on the command-line.
*
* @param argc The number of command-line arguments.
* @param argv The command-line arguments.
*/
static bool stderrIsOnTheCmdLine(int argc, const char *const *const argv)
{
for(int i = 1; i < argc; i++)
{
const std::string arg(argv[i]);
if(arg == "--stderr") return true;
}
return false;
}
#if 0
#include "cmdline/Configuration.hpp"
#include "common/Configuration.hpp"
#include <cryptopp/base64.h>
#include <cryptopp/osrng.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <xrootd/XrdCl/XrdClFile.hh>
/**
* Replaces all occurrences in a string "str" of a substring "from" with the string "to"
*
* @param str The original string
* @param from The substring to replace
* @param to The replacement string
*/
void replaceAll(std::string& str, const std::string& from, const std::string& to){
if(from.empty() || str.empty())
return;
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length();
}
}
/**
* Encodes a string in base 64 and replaces slashes ('/') in the result
* with underscores ('_').
*
* @param msg string to encode
* @return encoded string
*/
std::string encode(const std::string msg) {
std::string ret;
const bool noNewLineInBase64Output = false;
CryptoPP::StringSource ss1(msg, true, new CryptoPP::Base64Encoder(new CryptoPP::StringSink(ret), noNewLineInBase64Output));
// need to replace slashes ('/') with underscores ('_') because xroot removes
// consecutive slashes, and the cryptopp base64 algorithm may produce
// consecutive slashes. This is solved in cryptopp-5.6.3 (using
// Base64URLEncoder instead of Base64Encoder) but we currently have
// cryptopp-5.6.2. To be changed in the future...
replaceAll(ret, "/", "_");
return ret;
}
/**
* Formats the command path string
*
* @param argc The number of command-line arguments.
* @param argv The command-line arguments.
* @return the command string
*/
std::string formatCommandPath(const int argc, const char **argv) {
cta::cmdline::Configuration cliConf("/etc/cta/cta-cli.conf");
std::string cmdPath = "root://"+cliConf.getFrontendHostAndPort()+"//";
for(int i=0; i<argc; i++) {
if(i) cmdPath += "&";
cmdPath += encode(std::string(argv[i]));
}
return cmdPath;
}
/**
* Sends the command and waits for the reply
*
* @param argc The number of command-line arguments.
* @param argv The command-line arguments.
* @return the return code
*/
int sendCommand(const int argc, const char **argv) {
int rc = 0;
const bool writeToStderr = stderrIsOnTheCmdLine(argc, argv);
const std::string cmdPath = formatCommandPath(argc, argv);
XrdCl::File xrootFile;
// Open the xroot file reprsenting the execution of the command
{
const XrdCl::Access::Mode openMode = XrdCl::Access::None;
const uint16_t openTimeout = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds
const XrdCl::XRootDStatus openStatus = xrootFile.Open(cmdPath, XrdCl::OpenFlags::Read, openMode, openTimeout);
if(!openStatus.IsOK()) {
throw std::runtime_error(std::string("Failed to open ") + cmdPath + ": " + openStatus.ToStr());
}
}
// The cta frontend return code is the first char of the answer
{
uint64_t readOffset = 0;
uint32_t bytesRead = 0;
char rc_char = '0';
const XrdCl::XRootDStatus readStatus = xrootFile.Read(readOffset, 1, &rc_char, bytesRead);
if(!readStatus.IsOK()) {
throw std::runtime_error(std::string("Failed to read first byte from ") + cmdPath + ": " +
readStatus.ToStr());
}
if(bytesRead != 1) {
throw std::runtime_error(std::string("Failed to read first byte from ") + cmdPath +
": Expected to read exactly 1 byte, actually read " +
std::to_string((long long unsigned int)bytesRead) + " bytes");
}
rc = rc_char - '0';
}
// Read and print the command result
{
uint64_t readOffset = 1; // The first character at offset 0 has already been read
uint32_t bytesRead = 0;
const size_t bufSize = 20480;
std::unique_ptr<char []> buf(new char[bufSize]);
do {
bytesRead = 0;
memset(buf.get(), 0, bufSize);
const XrdCl::XRootDStatus readStatus = xrootFile.Read(readOffset, bufSize - 1, buf.get(), bytesRead);
if(!readStatus.IsOK()) {
throw std::runtime_error(std::string("Failed to read ") + cmdPath + ": " + readStatus.ToStr());
}
if(bytesRead > 0) {
std::cout << buf.get();
if(writeToStderr) {
std::cerr << buf.get();
}
}
readOffset += bytesRead;
} while(bytesRead > 0);
}
// Close the xroot file reprsenting the execution of the command
{
const XrdCl::XRootDStatus closeStatus = xrootFile.Close();
if(!closeStatus.IsOK()) {
throw std::runtime_error(std::string("Failed to close ") + cmdPath + ": " + closeStatus.ToStr());
}
}
return rc;
}
#endif
int exceptionThrowingMain(int argc, const char *const *const argv)
{
// Verify that the version of the Google Protocol Buffer library that we linked against is
// compatible with the version of the headers we compiled against
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Parse which workflow action to execute
if(argc < 2) throw Usage;
const std::string wf_command(argv[1]);
if(wf_command == "retrieve" || wf_command == "delete") throw std::runtime_error(wf_command + " is not implemented yet.");
if(wf_command != "archive") throw Usage;
// Fill the protocol buffer from the command line arguments
eos::wfe::Notification notification;
// Output the protocol buffer as a JSON object (for debugging)
std::cout << MessageToJsonString(notification);
#if 0
// Obtain a Service Provider
XrdSsiPbServiceType test_ssi_service(host, port, resource);
// Create a Request object
xrdssi::test::Request request;
request.set_message_text("Archive some file");
// Output message in Json format
std::cout << "Sending message:" << std::endl;
std::cout << MessageToJsonString(request);
// Send the Request to the Service
test_ssi_service.send(request);
// Wait for the response callback.
std::cout << "Request sent, going to sleep..." << std::endl;
// When test_ssi_service goes out-of-scope, the Service will try to shut down, but will wait
// for outstanding Requests to be processed
int wait_secs = 5;
while(--wait_secs)
{
std::cerr << ".";
sleep(1);
}
std::cout << "All done, exiting." << std::endl;
#endif
// Send output to stdout or stderr?
std::ostream &myout = stderrIsOnTheCmdLine(argc, argv) ? std::cerr : std::cout;
myout << "Hello, world" << std::endl;
// Optional: Delete all global objects allocated by libprotobuf
google::protobuf::ShutdownProtobufLibrary();
return 0;
}
/*!
* Start here
*
* @param argc The number of command-line arguments
* @param argv The command-line arguments
*/
int main(int argc, const char **argv)
{
try {
return exceptionThrowingMain(argc, argv);
} catch (std::exception &ex) {
std::cerr << "Failed to execute the command. Reason: " << ex.what() << std::endl;
return cta::common::dataStructures::FrontendReturnCode::ctaErrorNoRetry;
} catch (...) {
std::cerr << "Failed to execute the command for an unknown reason" << std::endl;
return cta::common::dataStructures::FrontendReturnCode::ctaErrorNoRetry;
}
}
# XRootD SSI + Google Protocol Buffers 3
This directory contains generic classes which bind protocol buffer definitions to the XRootD SSI
transport layer.
......@@ -2,7 +2,7 @@
#define __XRD_SSI_PB_ALERT_H
#include <XrdSsi/XrdSsiRespInfo.hh>
#include "XrdSsiException.h"
#include "XrdSsiPbException.h"
template<typename AlertType>
class AlertMsg : public XrdSsiRespInfoMsg
......@@ -14,7 +14,7 @@ public:
if(!alert.SerializeToString(&alert_str))
{
throw XrdSsiException("alert.SerializeToString() failed");
throw XrdSsiPbException("alert.SerializeToString() failed");
}
msgBuf = const_cast<char*>(alert_str.c_str());
......
#ifndef __XRD_SSI_EXCEPTION_H
#define __XRD_SSI_EXCEPTION_H
// Class to convert a XRootD error into a std::exception
// Perhaps should be part of XRootD?
/*!
* Class to convert a XRootD error into a std::exception
*/
#include <stdexcept>
#include <XrdSsi/XrdSsiErrInfo.hh>
class XrdSsiException : public std::exception
class XrdSsiPbException : public std::exception
{
public:
XrdSsiException(const std::string &err_msg) : m_err_msg(err_msg) {}
XrdSsiException(const XrdSsiErrInfo &eInfo) : m_err_msg(eInfo.Get()) {}
XrdSsiPbException(const std::string &err_msg) : m_err_msg(err_msg) {}
XrdSsiPbException(const XrdSsiErrInfo &eInfo) : m_err_msg(eInfo.Get()) {}
const char* what() const noexcept { return m_err_msg.c_str(); }
......
File moved
......@@ -2,7 +2,7 @@
#define __XRD_SSI_PB_REQUEST_PROC_H
#include <XrdSsi/XrdSsiResponder.hh>
#include "XrdSsiException.h"
#include "XrdSsiPbException.h"
#include "XrdSsiPbAlert.h"
/*
......@@ -70,7 +70,7 @@ void RequestProc<RequestType, ResponseType, MetadataType, AlertType>::Execute()
if(!m_request.ParseFromArray(request_buffer, request_len))
{
throw XrdSsiException("m_request.ParseFromArray() failed");
throw XrdSsiPbException("m_request.ParseFromArray() failed");
}
// Release the request buffer
......@@ -93,7 +93,7 @@ void RequestProc<RequestType, ResponseType, MetadataType, AlertType>::Execute()
if(!m_metadata.SerializeToString(&m_metadata_str))
{
throw XrdSsiException("m_metadata.SerializeToString() failed");
throw XrdSsiPbException("m_metadata.SerializeToString() failed");
}
// Send the Metadata
......@@ -107,7 +107,7 @@ void RequestProc<RequestType, ResponseType, MetadataType, AlertType>::Execute()
if(!m_response.SerializeToString(&m_response_str))
{
throw XrdSsiException("m_response.SerializeToString() failed");
throw XrdSsiPbException("m_response.SerializeToString() failed");
}
// Send the response
......
#ifndef __XRD_SSI_PB_SERVICE_CLIENT_SIDE_H
#define __XRD_SSI_PB_SERVICE_CLIENT_SIDE_H
#include <unistd.h> // sleep()
#include <XrdSsi/XrdSsiProvider.hh>
#include <XrdSsi/XrdSsiService.hh>
#include "XrdSsiException.h"
#include "XrdSsiPbException.h"
#include "XrdSsiPbRequest.h"
......@@ -45,7 +47,7 @@ public:
if(!(m_server_ptr = XrdSsiProviderClient->GetService(eInfo, hostname + ":" + std::to_string(port))))
{
throw XrdSsiException(eInfo);
throw XrdSsiPbException(eInfo);
}
}
......@@ -120,7 +122,7 @@ void XrdSsiPbServiceClientSide<RequestType, ResponseType, MetadataType, AlertTyp
if(!request.SerializeToString(&request_str))
{
throw XrdSsiException("request.SerializeToString() failed");
throw XrdSsiPbException("request.SerializeToString() failed");
}
// Requests are always executed in the context of a service. They need to correspond to what the service allows.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment