From 63aa0c6546e1808d52b7a81a5ffe9d61a82570e0 Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Thu, 23 Nov 2017 17:35:58 +0100 Subject: [PATCH] [ssi_af_ls] Consumes protocol buffers one-at-a-time from a byte stream --- cmdline/CtaAdminCmd.cpp | 44 ++++++++++++++++++++++++++- xroot_plugins/XrdCtaArchiveFileLs.hpp | 4 +-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index 72d8838397..5f07b9ad1d 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -25,6 +25,8 @@ #include "CtaAdminCmd.hpp" #include "XrdSsiPbDebug.hpp" +// Move to generic headers +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> // Define XRootD SSI Alert message callback @@ -52,9 +54,48 @@ void RequestCallback<cta::xrd::Alert>::operator()(const cta::xrd::Alert &alert) template<> void XrdSsiPbRequestType::DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen) { + google::protobuf::io::ArrayInputStream raw_stream(response_bufptr, response_buflen); + + //google::protobuf::io::CodedInputStream coded_stream(&raw_stream); + // coded_in->ReadVarint32(&n); + // cout << "#" << n << endl; + // + // std::string s; + // + // for (uint32_t i = 0; i < n; ++i) + // { + // uint32_t msgSize; + // coded_in->ReadVarint32(&msgSize); + // + // if ((msgSize > 0) && + // (coded_in->ReadString(&s, msgSize))) + // { + // + // Person p; + // p.ParseFromString(s); + // + // cout << "ID: " << p.id() << endl; + // cout << "name: " << p.name() << endl; + // cout << "gendre: " << gendres[p.gendre()-1] << endl; + // if (p.has_email()) + // { + // cout << "e-mail: " << p.email() << endl; + // } + // + // cout << endl; + // } + // } + // + // delete coded_in; + // delete raw_in; + cta::admin::ArchiveFileLsItem item; - item.ParseFromArray(response_bufptr, response_buflen); + for(int i = 0; i < 10; ++i) + { + item.ParseFromBoundedZeroCopyStream(&raw_stream, 18); + + OutputJsonString(std::cout, &item); std::cout << std::setfill(' ') << std::setw(7) << std::right << item.af().archive_file_id() << ' ' << std::setfill(' ') << std::setw(7) << std::right << item.copy_nb() << ' ' @@ -71,6 +112,7 @@ void XrdSsiPbRequestType::DataCallback(XrdSsiRequest::PRD_Xeq &post_process, cha << std::setfill(' ') << std::setw(8) << std::right << item.af().df().group() << ' ' << std::setfill(' ') << std::setw(13) << std::right << item.af().creation_time() << ' ' << item.af().df().path() << std::endl; + } } diff --git a/xroot_plugins/XrdCtaArchiveFileLs.hpp b/xroot_plugins/XrdCtaArchiveFileLs.hpp index dba1b755bc..3070de8b32 100644 --- a/xroot_plugins/XrdCtaArchiveFileLs.hpp +++ b/xroot_plugins/XrdCtaArchiveFileLs.hpp @@ -153,7 +153,7 @@ tmp_num_items = 0; #ifdef XRDSSI_DEBUG std::cerr << "[DEBUG] ArchiveFileLsStream::GetBuff(): XrdSsi buffer fill request (" << dlen << " bytes)" << std::endl; #endif - if(tmp_num_items >= 10) + if(tmp_num_items > 9) { // Nothing more to send, close the stream last = true; @@ -164,7 +164,7 @@ tmp_num_items = 0; cta::admin::ArchiveFileLsItem item; item.mutable_af()->set_disk_instance("Hello"); item.mutable_af()->set_disk_file_id("World"); - item.set_copy_nb(tmp_num_items++); + item.set_copy_nb(++tmp_num_items); std::string bufstr; item.SerializeToString(&bufstr); -- GitLab