Skip to content
Snippets Groups Projects
Commit 5439b895 authored by Michael Davis's avatar Michael Davis
Browse files

[xrd_ssi] Adds IStreamBuffer::DataCallback()

This is for client-side processing of incoming records on the stream
parent 7c603d7c
Branches
Tags
No related merge requests found
......@@ -19,7 +19,6 @@
#pragma once
#include <google/protobuf/io/coded_stream.h>
#include "XrdSsiPbException.hpp"
namespace XrdSsiPb {
......@@ -27,16 +26,16 @@ namespace XrdSsiPb {
/*!
* Input Stream Buffer class
*
* This implementation is for a record-based stream, i.e. the client must be configured with a XrdSsi stream buffer size
* This implementation is for a record-based stream. The client should be configured with a XrdSsi stream buffer size
* which is large with respect to the maximum size of DataType. This is mainly for efficiency reasons as there is a
* little extra overhead in handling records which are split across two SSI buffers. There is a hard limit that the
* record size cannot exceed the buffer size.
* little extra data copying overhead in handling records which are split across two SSI buffers. Note that there is
* also a hard limit: the record size cannot exceed the buffer size.
*
* The buffer size parameter is set in the constructor to XrdSsiPbServiceClientSide. The size of DataType is the maximum
* encoded size of the DataType protocol buffer on the wire.
*
* If there is a requirement to stream arbitrarily large binary blobs rather than records, this functionality needs to
* be added. See the comments on Request::ProcessResponseData().
* If there is a requirement to stream arbitrarily large binary blobs rather than records, this functionality will need
* to be added. See the comments on Request::ProcessResponseData().
*/
template<typename DataType>
class IStreamBuffer
......@@ -44,12 +43,12 @@ class IStreamBuffer
public:
IStreamBuffer(uint32_t bufsize) :
m_max_msglen(bufsize-sizeof(uint32_t)),
m_split_buffer(std::unique_ptr<uint8_t[]>(new uint8_t[m_max_msglen])),
m_split_buflen(0)
{
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] IStreamBuffer() constructor" << std::endl;
#endif
m_split_buffer = std::unique_ptr<uint8_t[]>(new uint8_t[m_max_msglen]);
}
~IStreamBuffer() {
......@@ -63,15 +62,35 @@ public:
*
* NOTE: This method is not reentrant; it is assumed it will be called from the XrdSsi framework
* in single-threaded mode. Each client or client thread must set up its own stream.
*
* @param[in] buf_ptr XRootD SSI stream or data buffer
* @param[in] buf_len Size of buf_ptr
*/
void push(const char *buf_ptr, int buf_len);
private:
/*!
* Pop a single record of known size from an input stream and pass it to the client
* Pop a single record from an input stream and pass it to the client
*
* If the message is split across the boundary between buffers, the partial message is saved and
* the method returns false.
*
* @param[in] msg_len Size of the next Protocol Buffer message on the wire
* @param[in,out] input_stream Protocol Buffer Coded Input Stream object wrapping the XRootD
* SSI buffer
*
* @retval true There are more messages to process on the stream
* @retval false End of the input stream was reached
*/
bool popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream);
/*!
*
*/
void DataCallback(DataType record) {
throw XrdSsiException("Stream/data payload received, but IStreamBuffer::DataCallback() has not been defined");
}
// Member variables
const uint32_t m_max_msglen; //!< Maximum allowed length of a protobuf on the wire
......@@ -89,28 +108,20 @@ void IStreamBuffer<DataType>::push(const char *buf_ptr, int buf_len)
uint32_t msg_len;
if(m_split_buflen > 0) {
// Stitch together the partial record
// Stitch together the saved partial record and the incoming record
if(m_split_buflen <= sizeof(uint32_t)) {
// The size field is split across the boundary, only need to copy that field
// The size field is split across the boundary, just copy that one field
int bytes_to_copy = sizeof(uint32_t) - m_split_buflen;
std::cout << "BEFORE <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
memcpy(m_split_buffer.get() + m_split_buflen, buf_ptr, bytes_to_copy);
input_stream.Skip(bytes_to_copy);
std::cout << "AFTER <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len);
std::cout << "Option 1, msg_len = " << msg_len << std::endl;
popRecord(msg_len, input_stream);
} else {
// The payload is split across the boundary, copy the rest of the record and extract it
// The payload is split across the boundary, copy the entire record
google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len);
if(msg_len > m_max_msglen) {
......@@ -131,6 +142,7 @@ std::cout << "Option 2, msg_len = " << msg_len << std::endl;
do {
const char *buf_ptr;
// Get pointer to next record
if(!input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len)) break;
std::cout << "[POP_RECORDS] buf_len = " << buf_len << std::endl;
......@@ -138,15 +150,7 @@ std::cout << "[POP_RECORDS] buf_len = " << buf_len << std::endl;
// Size field is split across the boundary, save the partial field and finish
std::cout << "[POP_RECORDS] Saving split of " << buf_len << " bytes" << std::endl;
m_split_buflen = buf_len;
std::cout << "SAVING/BEFORE <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
memcpy(m_split_buffer.get(), buf_ptr, m_split_buflen);
std::cout << "SAVING/AFTER <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
break;
}
......@@ -168,13 +172,13 @@ bool IStreamBuffer<DataType>::popRecord(int msg_len, google::protobuf::io::Coded
throw XrdSsiException("Data record size exceeds XRootD SSI buffer size");
}
// Get pointer to next record
if(!input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len)) buf_len = 0;
std::cout << "[POP_RECORD] buf_len = " << buf_len << ", msg_len = " << msg_len << std::endl;
if(buf_len < msg_len) {
std::cout << "[POP_RECORD] Saving split of 4+" << buf_len << " bytes" << std::endl;
// Record payload is split across the boundary, save the partial record
google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(msg_len, m_split_buffer.get());
memcpy(m_split_buffer.get() + sizeof(uint32_t), buf_ptr, buf_len);
m_split_buflen = buf_len + sizeof(uint32_t);
......@@ -185,6 +189,7 @@ std::cout << "[POP_RECORD] Saving split of 4+" << buf_len << " bytes" << std::en
record.ParseFromArray(buf_ptr, msg_len);
input_stream.Skip(msg_len);
DataCallback(record);
// If the message terminates at the end of the buffer, we are done, otherwise keep going
return buf_len != msg_len;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment