diff --git a/xroot_plugins/XrdCtaListPendingQueue.hpp b/xroot_plugins/XrdCtaListPendingQueue.hpp index 41133e3e0770db66eb66943ba86367c4d59ecc34..dcc647c1a5e947ec165860922c45793e9d5f3a4a 100644 --- a/xroot_plugins/XrdCtaListPendingQueue.hpp +++ b/xroot_plugins/XrdCtaListPendingQueue.hpp @@ -19,7 +19,7 @@ #pragma once #include <XrdSsiPbOStreamBuffer.hpp> -#include <catalogue/Catalogue.hpp> +#include <scheduler/OStoreDB/OStoreDB.hpp> @@ -66,7 +66,7 @@ public: virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) { XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); - XrdSsiPb::OStreamBuffer<cta::xrd::Data> *streambuf; + XrdSsiPb::OStreamBuffer<Data> *streambuf; try { if(m_queueItor.end()) { @@ -75,7 +75,7 @@ public: return nullptr; } - streambuf = new XrdSsiPb::OStreamBuffer<cta::xrd::Data>(dlen); + streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor) { @@ -83,8 +83,81 @@ public: if(!job.first) continue; - cta::xrd::Data record = fillRecord(job.second); + Data record = fillRecord(m_queueItor.qid(), job.second); + // is_buffer_full is set to true when we have one full block of data in the buffer, i.e. + // enough data to send to the client. The actual buffer size is double the block size, + // so we can keep writing a few additional records after is_buffer_full is true. These + // will be sent on the next iteration. If we exceed the hard limit of double the block + // size, Push() will throw an exception. + is_buffer_full = streambuf->Push(record); + } + dlen = streambuf->Size(); + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); + } catch(exception::Exception &ex) { + throw std::runtime_error(ex.getMessage().str()); + } catch(std::exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } catch(...) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } + return streambuf; + } + +private: + bool m_isExtended; + QueueItor_t m_queueItor; + + typedef decltype(m_queueItor.getJob().second) data_t; //!< Infer data type from template type + + Data fillRecord(const std::string &tape_id, const data_t &job); //!< Convert data to protobuf + + static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages +}; + + + +// Template specialisations for Archive and Retrieve Queue types + +template<> +Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool, const common::dataStructures::ArchiveJob &job) +{ + Data record; + + // Tapepool + record.mutable_af_ls_item()->set_tapepool(tapepool); + + // Copy number + record.mutable_af_ls_item()->set_copy_nb(job.copyNumber); + + // Archive file + auto af = record.mutable_af_ls_item()->mutable_af(); + af->set_archive_id(job.archiveFileID); + af->set_disk_instance(job.instanceName); + af->set_disk_id(job.request.diskFileID); + af->set_size(job.request.fileSize); + af->mutable_cs()->set_type(job.request.checksumType); + af->mutable_cs()->set_value(job.request.checksumValue); + af->set_storage_class(job.request.storageClass); + af->mutable_df()->set_owner(job.request.requester.name); + af->mutable_df()->set_group(job.request.requester.group); + af->mutable_df()->set_path(job.request.diskFileInfo.path); + +#if 0 + if(m_isExtended) + { + + } else { + auto lpa = record.mutable_lpa_summary(); + lpa->set_ + } +#endif #if 0 // Copy number record.mutable_af_ls_item()->set_copy_nb(jt->first); @@ -109,46 +182,9 @@ public: tf->set_f_seq(jt->second.fSeq); tf->set_block_id(jt->second.blockId); #endif - - // is_buffer_full is set to true when we have one full block of data in the buffer, i.e. - // enough data to send to the client. The actual buffer size is double the block size, - // so we can keep writing a few additional records after is_buffer_full is true. These - // will be sent on the next iteration. If we exceed the hard limit of double the block - // size, Push() will throw an exception. - is_buffer_full = streambuf->Push(record); - } - dlen = streambuf->Size(); - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); - } catch(cta::exception::Exception &ex) { - throw std::runtime_error(ex.getMessage().str()); - } catch(std::exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; - } catch(...) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; - } - return streambuf; - } - #if 0 std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob>> result; - if(tapepool) { - std::list<cta::common::dataStructures::ArchiveJob> list = m_scheduler.getPendingArchiveJobs(tapepool.value(), m_lc); - if(!list.empty()) result[tapepool.value()] = list; - } else { - result = m_scheduler.getPendingArchiveJobs(m_lc); - } - - if(!result.empty()) - { - std::vector<std::vector<std::string>> responseTable; - if(has_flag(OptionBoolean::EXTENDED)) { for(auto it = result.cbegin(); it != result.cend(); it++) { @@ -183,20 +219,9 @@ public: responseTable.push_back(currentRow); } } - - cmdlineOutput << formatResponse(responseTable); - } #endif - -private: - cta::xrd::Data fillRecord(const cta::common::dataStructures::ArchiveJob &job); - cta::xrd::Data fillRecord(const cta::common::dataStructures::RetrieveJob &job); - - bool m_isExtended; - QueueItor_t m_queueItor; - - static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages -}; + return record; +} }} // namespace cta::xrd