diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index f00f10f946fcb9aa2b7a1705bb96650564ddebc9..3ab47a1c8c96def5b80e79f5a5c23fb91e4d6245 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -52,7 +52,7 @@ void RequestCallback<cta::xrd::Alert>::operator()(const cta::xrd::Alert &alert) * Defines how incoming records from the stream should be handled */ template<> -void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) +void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const { const cta::admin::ArchiveFileLsItem &af_ls_item = record.af_ls_item(); diff --git a/xroot_plugins/XrdCtaArchiveFileLs.hpp b/xroot_plugins/XrdCtaArchiveFileLs.hpp index c1f4c7e167571006bfaf0a98bd0bda021e346e12..ddcb05dcdca0bed80e0522b2352a1b34befd9c96 100644 --- a/xroot_plugins/XrdCtaArchiveFileLs.hpp +++ b/xroot_plugins/XrdCtaArchiveFileLs.hpp @@ -69,47 +69,68 @@ public: #ifdef XRDSSI_DEBUG std::cerr << "[DEBUG] ArchiveFileLsStream::GetBuff(): XrdSsi buffer fill request (" << dlen << " bytes)" << std::endl; #endif - if(!m_archiveFileItor.hasMore()) - { - // Nothing more to send, close the stream - last = true; - return nullptr; - } - XrdSsiPb::OStreamBuffer *streambuf = new XrdSsiPb::OStreamBuffer(); - const cta::common::dataStructures::ArchiveFile archiveFile = m_archiveFileItor.next(); - - for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) { - cta::xrd::Data record; - - // Copy number - record.mutable_af_ls_item()->set_copy_nb(jt->first); - - // Archive file - auto af = record.mutable_af_ls_item()->mutable_af(); - af->set_archive_id(archiveFile.archiveFileID); - af->set_disk_instance(archiveFile.diskInstance); - af->set_disk_id(archiveFile.diskFileId); - af->set_size(archiveFile.fileSize); - af->mutable_cs()->set_type(archiveFile.checksumType); - af->mutable_cs()->set_value(archiveFile.checksumValue); - af->set_storage_class(archiveFile.storageClass); - af->mutable_df()->set_owner(archiveFile.diskFileInfo.owner); - af->mutable_df()->set_group(archiveFile.diskFileInfo.group); - af->mutable_df()->set_path(archiveFile.diskFileInfo.path); - af->set_creation_time(archiveFile.creationTime); - - // Tape file - auto tf = record.mutable_af_ls_item()->mutable_tf(); - tf->set_vid(jt->second.vid); - tf->set_f_seq(jt->second.fSeq); - tf->set_block_id(jt->second.blockId); - - dlen = streambuf->serialize(record); + XrdSsiPb::OStreamBuffer<cta::xrd::Data> *streambuf; + + try { + if(!m_archiveFileItor.hasMore()) { + // Nothing more to send, close the stream + last = true; + return nullptr; + } + + streambuf = new XrdSsiPb::OStreamBuffer<cta::xrd::Data>(dlen); + + for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; ) + { + const cta::common::dataStructures::ArchiveFile archiveFile = m_archiveFileItor.next(); + + for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) { + cta::xrd::Data record; + + // Copy number + record.mutable_af_ls_item()->set_copy_nb(jt->first); + + // Archive file + auto af = record.mutable_af_ls_item()->mutable_af(); + af->set_archive_id(archiveFile.archiveFileID); + af->set_disk_instance(archiveFile.diskInstance); + af->set_disk_id(archiveFile.diskFileId); + af->set_size(archiveFile.fileSize); + af->mutable_cs()->set_type(archiveFile.checksumType); + af->mutable_cs()->set_value(archiveFile.checksumValue); + af->set_storage_class(archiveFile.storageClass); + af->mutable_df()->set_owner(archiveFile.diskFileInfo.owner); + af->mutable_df()->set_group(archiveFile.diskFileInfo.group); + af->mutable_df()->set_path(archiveFile.diskFileInfo.path); + af->set_creation_time(archiveFile.creationTime); + + // Tape file + auto tf = record.mutable_af_ls_item()->mutable_tf(); + tf->set_vid(jt->second.vid); + tf->set_f_seq(jt->second.fSeq); + tf->set_block_id(jt->second.blockId); + + is_buffer_full = streambuf->Push(record); + } + } + dlen = streambuf->Size(); +#ifdef XRDSSI_DEBUG + std::cerr << "[DEBUG] ArchiveFileLsStream::GetBuff(): Returning buffer with " << dlen << " bytes of data." << std::endl; +#endif + } catch(cta::exception::Exception &ex) { + throw std::runtime_error(ex.getMessage().str()); + } catch(std::exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } catch(...) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; } - -std::cerr << "Returning buffer with " << dlen << " bytes of data." << std::endl; - return streambuf; } @@ -117,11 +138,5 @@ private: catalogue::ArchiveFileItor m_archiveFileItor; }; -#if 0 -ArchiveFileLsStream(m_catalogue.getArchiveFiles(searchCriteria), has_flag(OptionBoolean::SHOW_HEADER)); - -m_listArchiveFilesCmd.reset(new xrootPlugins::ListArchiveFilesCmd(xrdSfsFileError, has_flag(OptionBoolean::SHOW_HEADER), std::move(archiveFileItor))); -#endif - }} // namespace cta::xrd diff --git a/xroot_ssi_pb/XrdSsiPbIStreamBuffer.hpp b/xroot_ssi_pb/XrdSsiPbIStreamBuffer.hpp index d05f02bd70b5eb267210b5a17a120911784c8b6c..a026f0c9b448d0d62479b7a8c3f3a600d9816ec3 100644 --- a/xroot_ssi_pb/XrdSsiPbIStreamBuffer.hpp +++ b/xroot_ssi_pb/XrdSsiPbIStreamBuffer.hpp @@ -66,7 +66,7 @@ public: * @param[in] buf_ptr XRootD SSI stream or data buffer * @param[in] buf_len Size of buf_ptr */ - void push(const char *buf_ptr, int buf_len); + void Push(const char *buf_ptr, int buf_len); private: /*! @@ -85,9 +85,11 @@ private: bool popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream); /*! + * Callback to handle stream data * + * Define a specialised version of this method on the client side to handle a specific type of data stream */ - void DataCallback(DataType record) { + void DataCallback(DataType record) const { throw XrdSsiException("Stream/data payload received, but IStreamBuffer::DataCallback() has not been defined"); } @@ -101,7 +103,7 @@ private: template<typename DataType> -void IStreamBuffer<DataType>::push(const char *buf_ptr, int buf_len) +void IStreamBuffer<DataType>::Push(const char *buf_ptr, int buf_len) { google::protobuf::io::CodedInputStream input_stream(reinterpret_cast<const uint8_t*>(buf_ptr), buf_len); diff --git a/xroot_ssi_pb/XrdSsiPbOStreamBuffer.hpp b/xroot_ssi_pb/XrdSsiPbOStreamBuffer.hpp index fe3e639bbd58de9f1de50d4b412a44f9d16d2aca..7df24933b512dbbde1f78509b7843a1fb1c83364 100644 --- a/xroot_ssi_pb/XrdSsiPbOStreamBuffer.hpp +++ b/xroot_ssi_pb/XrdSsiPbOStreamBuffer.hpp @@ -27,15 +27,28 @@ namespace XrdSsiPb { * * This class binds XrdSsiStream::Buffer to the stream interface. * - * This is a naïve implementation, where memory is allocated and deallocated on every use. + * This is a naïve implementation, where memory is allocated and deallocated on every use. It favours + * computation efficiency over memory efficiency: the buffer allocated is twice the hint size, but data + * copying is minimized. * * A more performant implementation could be implemented using a buffer pool, using the Recycle() * method to return buffers to the pool. */ +template<typename DataType> class OStreamBuffer : public XrdSsiStream::Buffer { public: - OStreamBuffer() : XrdSsiStream::Buffer(nullptr) { + /*! + * Constructor + * + * data is a public member of XrdSsiStream::Buffer. It is an unmanaged char* pointer. We initialize + * it to double the hint size, with the implicit rule that the size of an individual serialized + * record on the wire cannot exceed the hint size. + */ + OStreamBuffer(uint32_t hint_size) : XrdSsiStream::Buffer(new char[hint_size * 2]), + m_hint_size(hint_size), + m_data_ptr(data), + m_data_size(0) { #ifdef XRDSSI_DEBUG std::cerr << "[DEBUG] OStreamBuffer() constructor" << std::endl; #endif @@ -45,36 +58,57 @@ public: #ifdef XRDSSI_DEBUG std::cerr << "[DEBUG] OStreamBuffer() destructor" << std::endl; #endif - // data is a public member of XrdSsiStream::Buffer. It is an unmanaged char* pointer. delete[] data; } - //! Serialize a Protocol Buffer into the Stream Buffer and return its size - template<typename ProtoBuf> - int serialize(const ProtoBuf &pb) { - // Drop any existing data. A more sophisticated implementation could chain new objects onto the buffer, - // using XrdSsiStream::Buffer::Next - delete[] data; + /*! + * Get the data size + */ + uint32_t Size() const { + return m_data_size; + } + + /*! + * Push a protobuf into the queue + * + * @retval true The buffer has been filled and is ready for sending + * @retval false There is room in the buffer for more records + */ + bool Push(const DataType &record) { + uint32_t bytesize = record.ByteSize(); - uint32_t bytesize = pb.ByteSize(); - data = new char[bytesize + sizeof(uint32_t)]; + if(m_data_size + bytesize > m_hint_size * 2) { + throw XrdSsiException("OStreamBuffer::Push(): Stream buffer overflow"); + } - // Write the size into the buffer - google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(bytesize, reinterpret_cast<google::protobuf::uint8*>(data)); + // Write the size of the next record into the buffer + google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(bytesize, reinterpret_cast<google::protobuf::uint8*>(m_data_ptr)); + m_data_ptr += sizeof(uint32_t); // Serialize the Protocol Buffer - pb.SerializeToArray(data + sizeof(uint32_t), bytesize); + record.SerializeToArray(m_data_ptr, bytesize); + m_data_ptr += bytesize; - // Return the total size of the buffer including bytesize - return bytesize + sizeof(uint32_t); + m_data_size += sizeof(uint32_t) + bytesize; + + return m_data_size >= m_hint_size; } private: - //! Called by the XrdSsi framework when it is finished with the object + /*! + * Called by the XrdSsi framework when it is finished with the object + */ virtual void Recycle() { std::cerr << "[DEBUG] OStreamBuffer::Recycle()" << std::endl; delete this; } + + // Member variables + + const uint32_t m_hint_size; //!< Requested size of the buffer from the XRootD framework + char *m_data_ptr; //!< Pointer to the raw storage + uint32_t m_data_size; //!< Total size of the buffer on the wire + std::vector<std::pair<uint32_t, DataType>> m_protobuf_q; //!< Queue of protobufs to be serialized }; } // namespace XrdSsiPb diff --git a/xroot_ssi_pb/XrdSsiPbRequest.hpp b/xroot_ssi_pb/XrdSsiPbRequest.hpp index 0c8c59f19c8e0f3c340f552fa2f0ca29d27a1b5d..63065f93f6b14911b7b44be1577cf20ef3cd3351 100644 --- a/xroot_ssi_pb/XrdSsiPbRequest.hpp +++ b/xroot_ssi_pb/XrdSsiPbRequest.hpp @@ -309,7 +309,7 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, DataType, AlertType> if(response_buflen != 0) { // Push stream/data buffer onto the input stream for the client - m_istream_buffer.push(response_bufptr, response_buflen); + m_istream_buffer.Push(response_bufptr, response_buflen); } if(is_last) // No more data to come