Skip to content
Snippets Groups Projects
Commit 7a8565c8 authored by Michael Davis's avatar Michael Davis
Browse files

[ssi_af_ls] Debugs and refactors IStreamBuffer class

parent 7e74127c
No related branches found
No related tags found
No related merge requests found
......@@ -62,7 +62,7 @@ tmp_num_items = 0;
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] ArchiveFileLsStream::GetBuff(): XrdSsi buffer fill request (" << dlen << " bytes)" << std::endl;
#endif
if(tmp_num_items > 9)
if(tmp_num_items > 30)
{
// Nothing more to send, close the stream
last = true;
......
......@@ -70,12 +70,7 @@ private:
/*!
* Pop a single record of known size from an input stream and pass it to the client
*/
void popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream);
/*!
* Pop all items from an input stream and pass them to the client one-at-a-time
*/
void popRecords(google::protobuf::io::CodedInputStream &input_stream);
bool popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream);
// Member variables
......@@ -91,128 +86,110 @@ void IStreamBuffer<DataType>::push(const char *buf_ptr, int buf_len)
{
google::protobuf::io::CodedInputStream input_stream(reinterpret_cast<const uint8_t*>(buf_ptr), buf_len);
uint32_t msg_len;
if(m_split_buflen > 0) {
// Stitch together the partial record
if(m_split_buflen < sizeof(uint32_t)) {
if(m_split_buflen <= sizeof(uint32_t)) {
// The size field is split across the boundary, only need to copy that field
int bytes_to_copy = sizeof(uint32_t) - m_split_buflen;
std::cout << "BEFORE <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
memcpy(m_split_buffer.get() + m_split_buflen, buf_ptr, bytes_to_copy);
input_stream.Skip(bytes_to_copy);
std::cout << "AFTER <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
uint32_t msg_len;
google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len);
input_stream.Skip(bytes_to_copy);
std::cout << "Option 1, msg_len = " << msg_len << std::endl;
popRecord(msg_len, input_stream);
} else {
// The payload is split across the boundary, copy the rest of the record
// The payload is split across the boundary, copy the rest of the record and extract it
uint32_t msg_len;
google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len);
if(msg_len > m_max_msglen) {
throw XrdSsiException("Data record size exceeds XRootD SSI buffer size");
}
int bytes_to_copy = msg_len - m_split_buflen - sizeof(uint32_t);
int bytes_to_copy = msg_len + sizeof(uint32_t) - m_split_buflen;
memcpy(m_split_buffer.get() + m_split_buflen, buf_ptr, bytes_to_copy);
input_stream.Skip(bytes_to_copy);
google::protobuf::io::CodedInputStream split_stream(reinterpret_cast<const uint8_t*>(m_split_buffer.get() + sizeof(uint32_t)), msg_len);
std::cout << "Option 2, msg_len = " << msg_len << std::endl;
popRecord(msg_len, split_stream);
}
}
// Extract records from the input buffer
popRecords(input_stream);
// Copy any leftover partial record to the holding buffer
input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &m_split_buflen);
memcpy(m_split_buffer.get(), buf_ptr, m_split_buflen);
}
template<typename DataType>
void IStreamBuffer<DataType>::popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream)
{
const char *buf_ptr;
int buf_len;
if(msg_len > m_max_msglen) {
throw XrdSsiException("Data record size exceeds XRootD SSI buffer size");
}
input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len);
std::cout << "[POP_RECORD] buf_len = " << buf_len << std::endl;
if(buf_len < msg_len) {
// Record payload is split across the boundary, save the partial record
m_split_buflen = msg_len;
memcpy(m_split_buffer.get(), buf_ptr, m_split_buflen);
} else {
DataType record;
record.ParseFromArray(buf_ptr, msg_len);
}
input_stream.Skip(msg_len);
}
template<typename DataType>
void IStreamBuffer<DataType>::popRecords(google::protobuf::io::CodedInputStream &input_stream)
{
uint32_t msg_len;
int buf_len;
// Extract remaining records from the input buffer
do {
const char *buf_ptr;
// Get size of next item on the stream
input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len);
std::cout << "buf_len = " << buf_len << std::endl;
if(!input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len)) break;
std::cout << "[POP_RECORDS] buf_len = " << buf_len << std::endl;
if(buf_len < static_cast<int>(sizeof(uint32_t))) {
// Size field is split across the boundary, save the partial field and finish
std::cout << "[POP_RECORDS] Saving split of " << buf_len << " bytes" << std::endl;
m_split_buflen = buf_len;
std::cout << "SAVING/BEFORE <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
memcpy(m_split_buffer.get(), buf_ptr, m_split_buflen);
input_stream.Skip(m_split_buflen);
std::cout << "SAVING/AFTER <" << static_cast<int>(m_split_buffer.get()[0])
<< "><" << static_cast<int>(m_split_buffer.get()[1])
<< "><" << static_cast<int>(m_split_buffer.get()[2])
<< "><" << static_cast<int>(m_split_buffer.get()[3]) << ">" << std::endl;
break;
}
// Get size of next item on the stream
input_stream.ReadLittleEndian32(&msg_len);
std::cout << "Bytesize = " << msg_len << std::endl;
// Get next item on the stream
popRecord(msg_len, input_stream);
} while(buf_len != static_cast<int>(msg_len));
std::cout << "[POP_RECORDS] Popping next message, msg_len = " << msg_len << std::endl;
} while(popRecord(msg_len, input_stream));
}
#if 0
template<typename DataType>
void IStreamBuffer<DataType>::popSplitItem(google::protobuf::io::CodedInputStream *first_stream, google::protobuf::io::CodedInputStream *second_stream)
bool IStreamBuffer<DataType>::popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream)
{
DataType record;
const char *buf_ptr;
int buf_len;
uint32_t msg_len;
int buf_len1, buf_len2;
const char *buf_ptr1, *buf_ptr2;
if(msg_len > m_max_msglen) {
throw XrdSsiException("Data record size exceeds XRootD SSI buffer size");
}
if(!input_stream.GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr), &buf_len)) buf_len = 0;
std::cout << "[POP_RECORD] buf_len = " << buf_len << ", msg_len = " << msg_len << std::endl;
if(buf_len < msg_len) {
std::cout << "[POP_RECORD] Saving split of 4+" << buf_len << " bytes" << std::endl;
// Record payload is split across the boundary, save the partial record
bool one = first_stream->GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr1), &buf_len1);
bool two = second_stream->GetDirectBufferPointer(reinterpret_cast<const void**>(&buf_ptr2), &buf_len2);
google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(msg_len, m_split_buffer.get());
memcpy(m_split_buffer.get() + sizeof(uint32_t), buf_ptr, buf_len);
m_split_buflen = buf_len + sizeof(uint32_t);
// Get size of next item on the stream
if(buf_len1 < static_cast<int>(sizeof(uint32_t))) {
return false;
} else {
// Case 2: message length is in the first buffer, payload is split over two buffers
first_stream->ReadLittleEndian32(&msg_len);
DataType record;
record.ParseFromArray(buf_ptr, msg_len);
input_stream.Skip(msg_len);
// If the message terminates at the end of the buffer, we are done, otherwise keep going
return buf_len != msg_len;
}
}
#endif
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment