diff --git a/xroot_plugins/XrdCtaStream.hpp b/xroot_plugins/XrdCtaStream.hpp new file mode 100644 index 0000000000000000000000000000000000000000..a354c0e268837e602426f5412c4caa8a95d10c29 --- /dev/null +++ b/xroot_plugins/XrdCtaStream.hpp @@ -0,0 +1,120 @@ +/*! + * @project The CERN Tape Archive (CTA) + * @brief CTA Frontend stream implementation for cta-admin commands + * @copyright Copyright 2019 CERN + * @license This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <XrdSsiPbOStreamBuffer.hpp> +#include <catalogue/Catalogue.hpp> +#include <scheduler/Scheduler.hpp> + + + +namespace cta { namespace xrd { + +/*! + * Virtual stream object + */ +class XrdCtaStream : public XrdSsiStream +{ +public: + XrdCtaStream(const cta::admin::AdminCmd &admincmd, cta::catalogue::Catalogue &catalogue, cta::scheduler::Scheduler &scheduler) + XrdSsiStream(XrdSsiStream::isActive), + m_catalogue(catalogue), + m_scheduler(scheduler) + { + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "XrdCtaStream() constructor"); + } + + virtual ~ArchiveFileLsStream() { + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~XrdCtaStream() destructor"); + } + + /*! + * Synchronously obtain data from an active stream + * + * Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an + * active stream in the constructor. + * + * @param[out] eInfo The object to receive any error description. + * @param[in,out] dlen input: the optimal amount of data wanted (this is a hint) + * output: the actual amount of data returned in the buffer. + * @param[in,out] last input: should be set to false. + * output: if true it indicates that no more data remains to be returned + * either for this call or on the next call. + * + * @return Pointer to the Buffer object that contains a pointer to the the data (see below). The + * buffer must be returned to the stream using Buffer::Recycle(). The next member is usable. + * @retval 0 No more data remains or an error occurred: + * last = true: No more data remains. + * last = false: A fatal error occurred, eRef has the reason. + */ + virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override { + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); + + XrdSsiPb::OStreamBuffer<Data> *streambuf; + + try { + if(isDone()) { + // Nothing more to send, close the stream + last = true; + return nullptr; + } + + streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); + + dlen = fillBuffer(streambuf); + + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); + } catch(cta::exception::Exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } catch(std::exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } catch(...) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } + return streambuf; + } + + /*! + * Returns true if there is nothing more to send (i.e. we can close the stream) + */ + bool isDone() const = 0; + + /*! + * Fills the stream buffer + */ + int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) = 0; + +protected: + cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue + cta::scheduler::Scheduler &m_scheduler; //!< Reference to CTA Scheduler + +private: + static constexpr const char* const LOG_SUFFIX = "XrdCtaStream"; //!< Identifier for log messages +}; + +}} // namespace cta::xrd