diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index 0fddca711ad75d5ccaa3058d8c14a33cdb823a53..06188c09b9372d089b56cb321381d1b5434657c0 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -382,8 +382,6 @@ void CtaAdminCmd::printAfLsHeader() << TEXT_NORMAL << std::endl; } - - void CtaAdminCmd::printAfLsItem(const cta::admin::ArchiveFileItem &af_item) { std::cout << std::setfill(' ') << std::setw(7) << std::right << af_item.af().archive_id() << ' ' @@ -404,8 +402,6 @@ void CtaAdminCmd::printAfLsItem(const cta::admin::ArchiveFileItem &af_item) << std::endl; } - - void CtaAdminCmd::printLpaHeader() { std::cout << TEXT_RED @@ -424,8 +420,6 @@ void CtaAdminCmd::printLpaHeader() << TEXT_NORMAL << std::endl; } - - void CtaAdminCmd::printLpaItem(const cta::admin::ArchiveFileItem &af_item) { std::cout << std::setfill(' ') << std::setw(18) << std::right << af_item.tapepool() << ' ' @@ -443,8 +437,6 @@ void CtaAdminCmd::printLpaItem(const cta::admin::ArchiveFileItem &af_item) << std::endl; } - - void CtaAdminCmd::printLpaSummaryHeader() { std::cout << TEXT_RED @@ -454,8 +446,6 @@ void CtaAdminCmd::printLpaSummaryHeader() << TEXT_NORMAL << std::endl; } - - void CtaAdminCmd::printLpaSummaryItem(const cta::admin::ArchiveFileSummaryItem &af_summary_item) { std::cout << std::setfill(' ') << std::setw(18) << std::right << af_summary_item.tapepool() << ' ' @@ -464,6 +454,38 @@ void CtaAdminCmd::printLpaSummaryItem(const cta::admin::ArchiveFileSummaryItem & << std::endl; } +void CtaAdminCmd::printLprHeader() +{ + std::cout << TEXT_RED + << std::setfill(' ') << std::setw(13) << std::right << "vid" << ' ' + << std::setfill(' ') << std::setw(7) << std::right << "id" << ' ' + << std::setfill(' ') << std::setw(7) << std::right << "copy no" << ' ' + << std::setfill(' ') << std::setw(7) << std::right << "fseq" << ' ' + << std::setfill(' ') << std::setw(9) << std::right << "block id" << ' ' + << std::setfill(' ') << std::setw(12) << std::right << "size" << ' ' + << std::setfill(' ') << std::setw(8) << std::right << "user" << ' ' + << std::setfill(' ') << std::setw(8) << std::right << "group" << ' ' + << "path" + << TEXT_NORMAL << std::endl; +} + +void CtaAdminCmd::printLprItem(const cta::admin::ArchiveFileItem &af_item) +{ +} + +void CtaAdminCmd::printLprSummaryHeader() +{ + std::cout << TEXT_RED + << std::setfill(' ') << std::setw(13) << std::right << "vid" << ' ' + << std::setfill(' ') << std::setw(13) << std::right << "total files" << ' ' + << std::setfill(' ') << std::setw(12) << std::right << "total size" << ' ' + << TEXT_NORMAL << std::endl; +} + +void CtaAdminCmd::printLprSummaryItem(const cta::admin::ArchiveFileSummaryItem &af_summary_item) +{ +} + }} // namespace cta::admin diff --git a/cmdline/CtaAdminCmd.hpp b/cmdline/CtaAdminCmd.hpp index ee497b841fc5ba0fc1bea683d453e482dc5caba6..931d3197bd2289b129c33bc5649eaaa99885db51 100644 --- a/cmdline/CtaAdminCmd.hpp +++ b/cmdline/CtaAdminCmd.hpp @@ -46,6 +46,12 @@ public: static void printLpaSummaryHeader(); static void printLpaSummaryItem(const ArchiveFileSummaryItem &af_summary_item); + // "listpendingretrieves" command + static void printLprHeader(); + static void printLprItem(const ArchiveFileItem &af_item); + static void printLprSummaryHeader(); + static void printLprSummaryItem(const ArchiveFileSummaryItem &af_summary_item); + private: /*! * Parse the options for a specific command/subcommand diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 6454c52b7c06e1ad4544735af724bee7cb233288..1a2ca6d62bcea03af1d54e1bbd6a78a1df118804 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -848,6 +848,15 @@ OStoreDB::getRetrieveJobs() const return ret; } +//------------------------------------------------------------------------------ +// OStoreDB::getRetrieveJobItor() +//------------------------------------------------------------------------------ +OStoreDB::RetrieveQueueItor_t OStoreDB::getRetrieveJobItor(const std::string &vid) const +{ + return RetrieveQueueItor_t(m_objectStore, vid); +} + + //------------------------------------------------------------------------------ // OStoreDB::getDriveStates() //------------------------------------------------------------------------------ diff --git a/xroot_plugins/XrdCtaListPendingQueue.hpp b/xroot_plugins/XrdCtaListPendingQueue.hpp index 6620be335790d87ffb4d5a8be938ecd318c49378..24acebff76c3ab364081e1e007e95544d266ff99 100644 --- a/xroot_plugins/XrdCtaListPendingQueue.hpp +++ b/xroot_plugins/XrdCtaListPendingQueue.hpp @@ -110,7 +110,7 @@ public: auto job = m_queueItor.getJob(); if(job.first) { ++total_files; - total_size += job.second.request.fileSize; + total_size += fileSize(job.second); } } @@ -148,12 +148,13 @@ private: Data fillRecord(const std::string &tape_id, const uint64_t &total_files, const uint64_t &total_size); //!< Convert summary to protobuf + static uint64_t fileSize(const data_t &job); //!< Obtain file size from queue item static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages }; -// Template specialisations for Archive and Retrieve Queue types +// Template specialisations for Archive Queue types template<> Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool, @@ -205,4 +206,113 @@ Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::strin return record; } +template<> +uint64_t ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fileSize(const data_t &job) { + return job.request.fileSize; +} + + + +// Template specialisations for Retrieve Queue types + +template<> +Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::string &vid, + const common::dataStructures::RetrieveJob &job) +{ + +#if 0 + if(vid) { + std::list<cta::common::dataStructures::RetrieveJob> list = m_scheduler.getPendingRetrieveJobs(vid.value(), m_lc); + if(!list.empty()) result[vid.value()] = list; + } else { + result = m_scheduler.getPendingRetrieveJobs(m_lc); + } + + for(auto it = result.cbegin(); it != result.cend(); it++) + { + for(auto jt = it->second.cbegin(); jt != it->second.cend(); jt++) + { + currentRow.push_back(jt->request.requester.name); + currentRow.push_back(jt->request.requester.group); + currentRow.push_back(jt->request.diskFileInfo.path); + responseTable.push_back(currentRow); + } + } +#endif + Data record; + + // Response type + record.mutable_af_item()->set_type(cta::admin::ArchiveFileItem::LISTPENDINGARCHIVES); + + // Tapepool + record.mutable_af_item()->set_vid(vid); + + // Retrieve file + auto af = record.mutable_af_item()->mutable_af(); + af->set_archive_id(job.request.archiveFileID); +#if 0 + cta::common::dataStructures::ArchiveFile file = m_catalogue.getArchiveFileById(jt->request.archiveFileID); + currentRow.push_back(std::to_string(static_cast<unsigned long long>((jt->tapeCopies.at(it->first).first)))); + currentRow.push_back(std::to_string(static_cast<unsigned long long>((jt->tapeCopies.at(it->first).second.fSeq)))); + currentRow.push_back(std::to_string(static_cast<unsigned long long>((jt->tapeCopies.at(it->first).second.blockId)))); + currentRow.push_back(std::to_string(static_cast<unsigned long long>(file.fileSize))); + af->set_disk_instance(job.instanceName); + af->set_disk_id(job.request.diskFileID); + af->set_size(job.request.fileSize); + af->mutable_cs()->set_type(job.request.checksumType); + af->mutable_cs()->set_value(job.request.checksumValue); + af->set_storage_class(job.request.storageClass); +#endif + af->mutable_df()->set_owner(job.request.requester.name); + af->mutable_df()->set_group(job.request.requester.group); + af->mutable_df()->set_path(job.request.diskFileInfo.path); + + return record; +} + +template<> +Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::string &tapepool, + const uint64_t &total_files, const uint64_t &total_size) +{ +#if 0 + for(auto it = result.cbegin(); it != result.cend(); it++) + { + std::vector<std::string> currentRow; + currentRow.push_back(it->first); + currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->second.size()))); + uint64_t size = 0; + for(auto jt = it->second.cbegin(); jt != it->second.cend(); jt++) + { + } + currentRow.push_back(std::to_string(static_cast<unsigned long long>(size))); + responseTable.push_back(currentRow); + } + } + cmdlineOutput << formatResponse(responseTable); +#endif + Data record; + + // Response type + record.mutable_af_summary_item()->set_type(cta::admin::ArchiveFileSummaryItem::LISTPENDINGRETRIEVES); + + // Tapepool + record.mutable_af_summary_item()->set_tapepool(tapepool); + + // Summary statistics + record.mutable_af_summary_item()->set_total_files(total_files); + record.mutable_af_summary_item()->set_total_size(total_size); + + return record; +} + +template<> +uint64_t ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fileSize(const data_t &job) { +#if 0 + cta::common::dataStructures::ArchiveFile file = m_catalogue.getArchiveFileById(jt->request.archiveFileID); + size += file.fileSize; + return job.request.fileSize; +#endif + return 0; +} + }} // namespace cta::xrd diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 219883bb8f62a11c24a83c9a771186d2a52fdbee..6c5fe6582cb9d33011f712ef101ef3df130883a3 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -168,7 +168,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons processListPendingArchives(request.admincmd(), response, stream); break; case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE): - processListPendingRetrieves(request.admincmd(), response); + processListPendingRetrieves(request.admincmd(), response, stream); break; case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD): processLogicalLibrary_Add(request.admincmd(), response); @@ -1216,7 +1216,7 @@ void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admi { using namespace cta::admin; - // Search filter criteria + // Filter criteria auto tapepool = getOptional(OptionString::TAPE_POOL); // Create a XrdSsi stream object to return the results @@ -1237,70 +1237,26 @@ void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admi -void RequestMessage::processListPendingRetrieves(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response) +void RequestMessage::processListPendingRetrieves(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream) { using namespace cta::admin; - std::stringstream cmdlineOutput; - - std::map<std::string, std::list<cta::common::dataStructures::RetrieveJob> > result; - + // Filter criteria auto vid = getOptional(OptionString::VID); - if(vid) { - std::list<cta::common::dataStructures::RetrieveJob> list = m_scheduler.getPendingRetrieveJobs(vid.value(), m_lc); - if(!list.empty()) result[vid.value()] = list; - } else { - result = m_scheduler.getPendingRetrieveJobs(m_lc); - } - - if(!result.empty()) - { - std::vector<std::vector<std::string>> responseTable; + // Create a XrdSsi stream object to return the results + stream = new ListPendingQueue<OStoreDB::RetrieveQueueItor_t>(has_flag(OptionBoolean::EXTENDED), + m_scheddb.getRetrieveJobItor(vid ? vid.value() : "")); + // Should the client display column headers? + if(has_flag(OptionBoolean::SHOW_HEADER)) { if(has_flag(OptionBoolean::EXTENDED)) { - std::vector<std::string> header = {"vid","id","copy no.","fseq","block id","size","user","group","path"}; - if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header); - for(auto it = result.cbegin(); it != result.cend(); it++) - { - for(auto jt = it->second.cbegin(); jt != it->second.cend(); jt++) - { - std::vector<std::string> currentRow; - currentRow.push_back(it->first); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(jt->request.archiveFileID))); - cta::common::dataStructures::ArchiveFile file = m_catalogue.getArchiveFileById(jt->request.archiveFileID); - currentRow.push_back(std::to_string(static_cast<unsigned long long>((jt->tapeCopies.at(it->first).first)))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>((jt->tapeCopies.at(it->first).second.fSeq)))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>((jt->tapeCopies.at(it->first).second.blockId)))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(file.fileSize))); - currentRow.push_back(jt->request.requester.name); - currentRow.push_back(jt->request.requester.group); - currentRow.push_back(jt->request.diskFileInfo.path); - responseTable.push_back(currentRow); - } - } + response.set_show_header(HeaderType::LISTPENDINGRETRIEVES); } else { - std::vector<std::string> header = {"vid","total files","total size"}; - if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header); - for(auto it = result.cbegin(); it != result.cend(); it++) - { - std::vector<std::string> currentRow; - currentRow.push_back(it->first); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->second.size()))); - uint64_t size = 0; - for(auto jt = it->second.cbegin(); jt != it->second.cend(); jt++) - { - cta::common::dataStructures::ArchiveFile file = m_catalogue.getArchiveFileById(jt->request.archiveFileID); - size += file.fileSize; - } - currentRow.push_back(std::to_string(static_cast<unsigned long long>(size))); - responseTable.push_back(currentRow); - } + response.set_show_header(HeaderType::LISTPENDINGRETRIEVES_SUMMARY); } - cmdlineOutput << formatResponse(responseTable); } - response.set_message_txt(cmdlineOutput.str()); response.set_type(cta::xrd::Response::RSP_SUCCESS); } diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index de53b647bbfa0ad05169cfcf6822c68908876792..f7deb96a3bb0dc0a7307dcbb8285111c7f712ae7 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -151,7 +151,7 @@ private: admincmdstream_t processArchiveFile_Ls; admincmdstream_t processListPendingArchives; - admincmd_t processListPendingRetrieves; // TODO: convert to stream format + admincmdstream_t processListPendingRetrieves; /*! * Log an admin command