Skip to content
Snippets Groups Projects
Commit 60f7f657 authored by Michael Davis's avatar Michael Davis
Browse files

[lpa-stream] Packs protocol buffer with reply

parent b6b52c0e
No related branches found
No related tags found
No related merge requests found
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#pragma once #pragma once
#include <XrdSsiPbOStreamBuffer.hpp> #include <XrdSsiPbOStreamBuffer.hpp>
#include <catalogue/Catalogue.hpp> #include <scheduler/OStoreDB/OStoreDB.hpp>
...@@ -66,7 +66,7 @@ public: ...@@ -66,7 +66,7 @@ public:
virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) { virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) {
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
XrdSsiPb::OStreamBuffer<cta::xrd::Data> *streambuf; XrdSsiPb::OStreamBuffer<Data> *streambuf;
try { try {
if(m_queueItor.end()) { if(m_queueItor.end()) {
...@@ -75,7 +75,7 @@ public: ...@@ -75,7 +75,7 @@ public:
return nullptr; return nullptr;
} }
streambuf = new XrdSsiPb::OStreamBuffer<cta::xrd::Data>(dlen); streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor) for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor)
{ {
...@@ -83,8 +83,81 @@ public: ...@@ -83,8 +83,81 @@ public:
if(!job.first) continue; if(!job.first) continue;
cta::xrd::Data record = fillRecord(job.second); Data record = fillRecord(m_queueItor.qid(), job.second);
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
// enough data to send to the client. The actual buffer size is double the block size,
// so we can keep writing a few additional records after is_buffer_full is true. These
// will be sent on the next iteration. If we exceed the hard limit of double the block
// size, Push() will throw an exception.
is_buffer_full = streambuf->Push(record);
}
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
} catch(exception::Exception &ex) {
throw std::runtime_error(ex.getMessage().str());
} catch(std::exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
} catch(...) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
}
return streambuf;
}
private:
bool m_isExtended;
QueueItor_t m_queueItor;
typedef decltype(m_queueItor.getJob().second) data_t; //!< Infer data type from template type
Data fillRecord(const std::string &tape_id, const data_t &job); //!< Convert data to protobuf
static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages
};
// Template specialisations for Archive and Retrieve Queue types
template<>
Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool, const common::dataStructures::ArchiveJob &job)
{
Data record;
// Tapepool
record.mutable_af_ls_item()->set_tapepool(tapepool);
// Copy number
record.mutable_af_ls_item()->set_copy_nb(job.copyNumber);
// Archive file
auto af = record.mutable_af_ls_item()->mutable_af();
af->set_archive_id(job.archiveFileID);
af->set_disk_instance(job.instanceName);
af->set_disk_id(job.request.diskFileID);
af->set_size(job.request.fileSize);
af->mutable_cs()->set_type(job.request.checksumType);
af->mutable_cs()->set_value(job.request.checksumValue);
af->set_storage_class(job.request.storageClass);
af->mutable_df()->set_owner(job.request.requester.name);
af->mutable_df()->set_group(job.request.requester.group);
af->mutable_df()->set_path(job.request.diskFileInfo.path);
#if 0
if(m_isExtended)
{
} else {
auto lpa = record.mutable_lpa_summary();
lpa->set_
}
#endif
#if 0 #if 0
// Copy number // Copy number
record.mutable_af_ls_item()->set_copy_nb(jt->first); record.mutable_af_ls_item()->set_copy_nb(jt->first);
...@@ -109,46 +182,9 @@ public: ...@@ -109,46 +182,9 @@ public:
tf->set_f_seq(jt->second.fSeq); tf->set_f_seq(jt->second.fSeq);
tf->set_block_id(jt->second.blockId); tf->set_block_id(jt->second.blockId);
#endif #endif
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
// enough data to send to the client. The actual buffer size is double the block size,
// so we can keep writing a few additional records after is_buffer_full is true. These
// will be sent on the next iteration. If we exceed the hard limit of double the block
// size, Push() will throw an exception.
is_buffer_full = streambuf->Push(record);
}
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
} catch(cta::exception::Exception &ex) {
throw std::runtime_error(ex.getMessage().str());
} catch(std::exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
} catch(...) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
}
return streambuf;
}
#if 0 #if 0
std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob>> result; std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob>> result;
if(tapepool) {
std::list<cta::common::dataStructures::ArchiveJob> list = m_scheduler.getPendingArchiveJobs(tapepool.value(), m_lc);
if(!list.empty()) result[tapepool.value()] = list;
} else {
result = m_scheduler.getPendingArchiveJobs(m_lc);
}
if(!result.empty())
{
std::vector<std::vector<std::string>> responseTable;
if(has_flag(OptionBoolean::EXTENDED)) if(has_flag(OptionBoolean::EXTENDED))
{ {
for(auto it = result.cbegin(); it != result.cend(); it++) { for(auto it = result.cbegin(); it != result.cend(); it++) {
...@@ -183,20 +219,9 @@ public: ...@@ -183,20 +219,9 @@ public:
responseTable.push_back(currentRow); responseTable.push_back(currentRow);
} }
} }
cmdlineOutput << formatResponse(responseTable);
}
#endif #endif
return record;
private: }
cta::xrd::Data fillRecord(const cta::common::dataStructures::ArchiveJob &job);
cta::xrd::Data fillRecord(const cta::common::dataStructures::RetrieveJob &job);
bool m_isExtended;
QueueItor_t m_queueItor;
static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages
};
}} // namespace cta::xrd }} // namespace cta::xrd
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment