Skip to content
Snippets Groups Projects
Commit ae71117d authored by Michael Davis's avatar Michael Davis
Browse files

[ssi_af_ls] Implements multi-item output stream buffer on server side

parent 51729808
No related branches found
No related tags found
No related merge requests found
......@@ -52,7 +52,7 @@ void RequestCallback<cta::xrd::Alert>::operator()(const cta::xrd::Alert &alert)
* Defines how incoming records from the stream should be handled
*/
template<>
void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record)
void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const
{
const cta::admin::ArchiveFileLsItem &af_ls_item = record.af_ls_item();
......
......@@ -69,47 +69,68 @@ public:
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] ArchiveFileLsStream::GetBuff(): XrdSsi buffer fill request (" << dlen << " bytes)" << std::endl;
#endif
if(!m_archiveFileItor.hasMore())
{
// Nothing more to send, close the stream
last = true;
return nullptr;
}
XrdSsiPb::OStreamBuffer *streambuf = new XrdSsiPb::OStreamBuffer();
const cta::common::dataStructures::ArchiveFile archiveFile = m_archiveFileItor.next();
for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) {
cta::xrd::Data record;
// Copy number
record.mutable_af_ls_item()->set_copy_nb(jt->first);
// Archive file
auto af = record.mutable_af_ls_item()->mutable_af();
af->set_archive_id(archiveFile.archiveFileID);
af->set_disk_instance(archiveFile.diskInstance);
af->set_disk_id(archiveFile.diskFileId);
af->set_size(archiveFile.fileSize);
af->mutable_cs()->set_type(archiveFile.checksumType);
af->mutable_cs()->set_value(archiveFile.checksumValue);
af->set_storage_class(archiveFile.storageClass);
af->mutable_df()->set_owner(archiveFile.diskFileInfo.owner);
af->mutable_df()->set_group(archiveFile.diskFileInfo.group);
af->mutable_df()->set_path(archiveFile.diskFileInfo.path);
af->set_creation_time(archiveFile.creationTime);
// Tape file
auto tf = record.mutable_af_ls_item()->mutable_tf();
tf->set_vid(jt->second.vid);
tf->set_f_seq(jt->second.fSeq);
tf->set_block_id(jt->second.blockId);
dlen = streambuf->serialize(record);
XrdSsiPb::OStreamBuffer<cta::xrd::Data> *streambuf;
try {
if(!m_archiveFileItor.hasMore()) {
// Nothing more to send, close the stream
last = true;
return nullptr;
}
streambuf = new XrdSsiPb::OStreamBuffer<cta::xrd::Data>(dlen);
for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; )
{
const cta::common::dataStructures::ArchiveFile archiveFile = m_archiveFileItor.next();
for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) {
cta::xrd::Data record;
// Copy number
record.mutable_af_ls_item()->set_copy_nb(jt->first);
// Archive file
auto af = record.mutable_af_ls_item()->mutable_af();
af->set_archive_id(archiveFile.archiveFileID);
af->set_disk_instance(archiveFile.diskInstance);
af->set_disk_id(archiveFile.diskFileId);
af->set_size(archiveFile.fileSize);
af->mutable_cs()->set_type(archiveFile.checksumType);
af->mutable_cs()->set_value(archiveFile.checksumValue);
af->set_storage_class(archiveFile.storageClass);
af->mutable_df()->set_owner(archiveFile.diskFileInfo.owner);
af->mutable_df()->set_group(archiveFile.diskFileInfo.group);
af->mutable_df()->set_path(archiveFile.diskFileInfo.path);
af->set_creation_time(archiveFile.creationTime);
// Tape file
auto tf = record.mutable_af_ls_item()->mutable_tf();
tf->set_vid(jt->second.vid);
tf->set_f_seq(jt->second.fSeq);
tf->set_block_id(jt->second.blockId);
is_buffer_full = streambuf->Push(record);
}
}
dlen = streambuf->Size();
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] ArchiveFileLsStream::GetBuff(): Returning buffer with " << dlen << " bytes of data." << std::endl;
#endif
} catch(cta::exception::Exception &ex) {
throw std::runtime_error(ex.getMessage().str());
} catch(std::exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
} catch(...) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
}
std::cerr << "Returning buffer with " << dlen << " bytes of data." << std::endl;
return streambuf;
}
......@@ -117,11 +138,5 @@ private:
catalogue::ArchiveFileItor m_archiveFileItor;
};
#if 0
ArchiveFileLsStream(m_catalogue.getArchiveFiles(searchCriteria), has_flag(OptionBoolean::SHOW_HEADER));
m_listArchiveFilesCmd.reset(new xrootPlugins::ListArchiveFilesCmd(xrdSfsFileError, has_flag(OptionBoolean::SHOW_HEADER), std::move(archiveFileItor)));
#endif
}} // namespace cta::xrd
......@@ -66,7 +66,7 @@ public:
* @param[in] buf_ptr XRootD SSI stream or data buffer
* @param[in] buf_len Size of buf_ptr
*/
void push(const char *buf_ptr, int buf_len);
void Push(const char *buf_ptr, int buf_len);
private:
/*!
......@@ -85,9 +85,11 @@ private:
bool popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream);
/*!
* Callback to handle stream data
*
* Define a specialised version of this method on the client side to handle a specific type of data stream
*/
void DataCallback(DataType record) {
void DataCallback(DataType record) const {
throw XrdSsiException("Stream/data payload received, but IStreamBuffer::DataCallback() has not been defined");
}
......@@ -101,7 +103,7 @@ private:
template<typename DataType>
void IStreamBuffer<DataType>::push(const char *buf_ptr, int buf_len)
void IStreamBuffer<DataType>::Push(const char *buf_ptr, int buf_len)
{
google::protobuf::io::CodedInputStream input_stream(reinterpret_cast<const uint8_t*>(buf_ptr), buf_len);
......
......@@ -27,15 +27,28 @@ namespace XrdSsiPb {
*
* This class binds XrdSsiStream::Buffer to the stream interface.
*
* This is a naïve implementation, where memory is allocated and deallocated on every use.
* This is a naïve implementation, where memory is allocated and deallocated on every use. It favours
* computation efficiency over memory efficiency: the buffer allocated is twice the hint size, but data
* copying is minimized.
*
* A more performant implementation could be implemented using a buffer pool, using the Recycle()
* method to return buffers to the pool.
*/
template<typename DataType>
class OStreamBuffer : public XrdSsiStream::Buffer
{
public:
OStreamBuffer() : XrdSsiStream::Buffer(nullptr) {
/*!
* Constructor
*
* data is a public member of XrdSsiStream::Buffer. It is an unmanaged char* pointer. We initialize
* it to double the hint size, with the implicit rule that the size of an individual serialized
* record on the wire cannot exceed the hint size.
*/
OStreamBuffer(uint32_t hint_size) : XrdSsiStream::Buffer(new char[hint_size * 2]),
m_hint_size(hint_size),
m_data_ptr(data),
m_data_size(0) {
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] OStreamBuffer() constructor" << std::endl;
#endif
......@@ -45,36 +58,57 @@ public:
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] OStreamBuffer() destructor" << std::endl;
#endif
// data is a public member of XrdSsiStream::Buffer. It is an unmanaged char* pointer.
delete[] data;
}
//! Serialize a Protocol Buffer into the Stream Buffer and return its size
template<typename ProtoBuf>
int serialize(const ProtoBuf &pb) {
// Drop any existing data. A more sophisticated implementation could chain new objects onto the buffer,
// using XrdSsiStream::Buffer::Next
delete[] data;
/*!
* Get the data size
*/
uint32_t Size() const {
return m_data_size;
}
/*!
* Push a protobuf into the queue
*
* @retval true The buffer has been filled and is ready for sending
* @retval false There is room in the buffer for more records
*/
bool Push(const DataType &record) {
uint32_t bytesize = record.ByteSize();
uint32_t bytesize = pb.ByteSize();
data = new char[bytesize + sizeof(uint32_t)];
if(m_data_size + bytesize > m_hint_size * 2) {
throw XrdSsiException("OStreamBuffer::Push(): Stream buffer overflow");
}
// Write the size into the buffer
google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(bytesize, reinterpret_cast<google::protobuf::uint8*>(data));
// Write the size of the next record into the buffer
google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(bytesize, reinterpret_cast<google::protobuf::uint8*>(m_data_ptr));
m_data_ptr += sizeof(uint32_t);
// Serialize the Protocol Buffer
pb.SerializeToArray(data + sizeof(uint32_t), bytesize);
record.SerializeToArray(m_data_ptr, bytesize);
m_data_ptr += bytesize;
// Return the total size of the buffer including bytesize
return bytesize + sizeof(uint32_t);
m_data_size += sizeof(uint32_t) + bytesize;
return m_data_size >= m_hint_size;
}
private:
//! Called by the XrdSsi framework when it is finished with the object
/*!
* Called by the XrdSsi framework when it is finished with the object
*/
virtual void Recycle() {
std::cerr << "[DEBUG] OStreamBuffer::Recycle()" << std::endl;
delete this;
}
// Member variables
const uint32_t m_hint_size; //!< Requested size of the buffer from the XRootD framework
char *m_data_ptr; //!< Pointer to the raw storage
uint32_t m_data_size; //!< Total size of the buffer on the wire
std::vector<std::pair<uint32_t, DataType>> m_protobuf_q; //!< Queue of protobufs to be serialized
};
} // namespace XrdSsiPb
......
......@@ -309,7 +309,7 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, DataType, AlertType>
if(response_buflen != 0)
{
// Push stream/data buffer onto the input stream for the client
m_istream_buffer.push(response_bufptr, response_buflen);
m_istream_buffer.Push(response_bufptr, response_buflen);
}
if(is_last) // No more data to come
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment