Commit e7582ee3 authored by Steven Murray's avatar Steven Murray
Browse files

cta/CTA#139 cta af ls --vid XXXXX command times out if there are many files on a tape

First version of a streaming "cta archivefile ls" command.
parent 34503ff6
......@@ -167,7 +167,14 @@ int sendCommand(const int argc, const char **argv) {
}
if(bytesRead > 0) {
std::cout << buf;
for(std::string::size_type i = 0; i < bytesRead; i++) {
const char c = buf[i];
if(0 == c) {
std::cout << "<NULL>";
} else {
std::cout << c;
}
}
if(writeToStderr) {
std::cerr<<buf;
......
......@@ -24,7 +24,7 @@ include_directories(${CMAKE_SOURCE_DIR}/tapeserver)
find_package(Protobuf3 REQUIRED)
include_directories(${PROTOBUF3_INCLUDE_DIRS})
add_library (XrdCtaOfs MODULE XrdCtaFilesystem.cpp XrdCtaFile.cpp XrdCtaDir.cpp)
add_library (XrdCtaOfs MODULE ListArchiveFilesCmd.cpp XrdCtaFilesystem.cpp XrdCtaFile.cpp XrdCtaDir.cpp)
target_link_libraries (XrdCtaOfs ctacatalogue ctaeosmessages ctascheduler ctacommon ${PROTOBUF3_LIBRARIES} ctaobjectstore cryptopp)
set_target_properties(XrdCtaOfs PROPERTIES INSTALL_RPATH ${PROTOBUF3_RPATH})
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "xroot_plugins/ListArchiveFilesCmd.hpp"
#include <sstream>
#include <stdint.h>
namespace cta {
namespace xrootPlugins {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
ListArchiveFilesCmd::ListArchiveFilesCmd(
log::Logger &log,
XrdOucErrInfo &xrdSfsFileError,
const bool displayHeader,
const catalogue::TapeFileSearchCriteria &searchCriteria,
catalogue::Catalogue &catalogue):
m_log(log),
m_xrdSfsFileError(xrdSfsFileError),
m_displayHeader(displayHeader),
m_archiveFileItor(catalogue.getArchiveFileItor(searchCriteria)) {
}
//------------------------------------------------------------------------------
// read
//------------------------------------------------------------------------------
XrdSfsXferSize ListArchiveFilesCmd::read(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize size) {
if(State::LISTED_LAST_ARCHIVE_FILE == m_state) {
return SFS_OK;
}
if(State::WAITING_FOR_FIRST_READ == m_state) {
m_state = State::LISTING_ARCHIVE_FILES;
// The first character of the reply stream is the return code
m_readBuffer = "0";
if(m_displayHeader) {
m_readBuffer += "id copy no vid fseq block id instance disk id size checksum type checksum value "
"storage class owner group path creation time\n";
}
}
if(offset != m_expectedFileOffset) {
std::ostringstream errMsg;
errMsg << "Unexpected read offset: expected=" << m_expectedFileOffset << " actual=" << offset;
m_xrdSfsFileError.setErrInfo(ESPIPE, errMsg.str().c_str());
return SFS_ERROR;
}
XrdSfsXferSize offSetIntoReadBuffer = offset - m_fileOffsetOfReadBuffer;
if(0 > offSetIntoReadBuffer) {
std::ostringstream errMsg;
errMsg << "offSetIntoReadBuffer must be positive: actual=" << offSetIntoReadBuffer;
m_xrdSfsFileError.setErrInfo(ESPIPE, errMsg.str().c_str());
return SFS_ERROR;
}
XrdSfsXferSize nbBytesToBeReadFromBuffer = (XrdSfsXferSize)m_readBuffer.size() - offSetIntoReadBuffer;
if(0 > nbBytesToBeReadFromBuffer) {
std::ostringstream errMsg;
errMsg << "nbBytesToBeReadFromBuffer must be positive: actual=" << nbBytesToBeReadFromBuffer;
m_xrdSfsFileError.setErrInfo(ESPIPE, errMsg.str().c_str());
return SFS_ERROR;
}
if(nbBytesToBeReadFromBuffer == 0) {
refreshReadBuffer();
if (m_readBuffer.empty()) {
m_state = State::LISTED_LAST_ARCHIVE_FILE;
return SFS_OK;
}
m_fileOffsetOfReadBuffer = offset;
offSetIntoReadBuffer = 0;
nbBytesToBeReadFromBuffer = m_readBuffer.size();
}
const XrdSfsXferSize actualNbBytesToRead = nbBytesToBeReadFromBuffer >= size ? size : nbBytesToBeReadFromBuffer;
if(0 > actualNbBytesToRead) {
std::ostringstream errMsg;
errMsg << "actualNbBytesToRead must be positive: actual=" << actualNbBytesToRead;
m_xrdSfsFileError.setErrInfo(ESPIPE, errMsg.str().c_str());
return SFS_ERROR;
}
strncpy(buffer, m_readBuffer.c_str() + offSetIntoReadBuffer, actualNbBytesToRead);
m_expectedFileOffset += actualNbBytesToRead;
return actualNbBytesToRead;
}
//------------------------------------------------------------------------------
// refreshReadBuffer
//------------------------------------------------------------------------------
void ListArchiveFilesCmd::refreshReadBuffer() {
m_readBuffer.clear();
if(m_archiveFileItor->hasMore()) {
const common::dataStructures::ArchiveFile archiveFile = m_archiveFileItor->next();
for(auto copyNbToTapeFile: archiveFile.tapeFiles) {
const auto copyNb = copyNbToTapeFile.first;
const common::dataStructures::TapeFile &tapeFile = copyNbToTapeFile.second;
m_readBuffer +=
std::to_string((unsigned long long) archiveFile.archiveFileID) + " " +
std::to_string((unsigned long long) copyNb) + " " +
tapeFile.vid + " " +
std::to_string((unsigned long long) tapeFile.fSeq) + " " +
std::to_string((unsigned long long) tapeFile.blockId) + " " +
archiveFile.diskInstance + " " +
archiveFile.diskFileId + " " +
std::to_string((unsigned long long) archiveFile.fileSize) + " " +
archiveFile.checksumType + " " +
archiveFile.checksumValue + " " +
archiveFile.storageClass + " " +
archiveFile.diskFileInfo.owner + " " +
archiveFile.diskFileInfo.group + " " +
archiveFile.diskFileInfo.path + " " +
std::to_string((unsigned long long) archiveFile.creationTime) + "\n";
}
}
}
//------------------------------------------------------------------------------
// stateToStr
//------------------------------------------------------------------------------
std::string ListArchiveFilesCmd::stateToStr(const State state) const {
switch(state) {
case WAITING_FOR_FIRST_READ:
return "WAITING_FOR_FIRST_READ";
case LISTING_ARCHIVE_FILES:
return "LISTING_ARCHIVE_FILES";
case LISTED_LAST_ARCHIVE_FILE:
return "LISTED_LAST_ARCHIVE_FILE";
default:
return "UNKNOWN";
}
}
} // namespace xrootPlugins
} // namespace cta
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "catalogue/Catalogue.hpp"
#include "catalogue/TapeFileSearchCriteria.hpp"
#include "common/log/Logger.hpp"
#include <XrdSfs/XrdSfsInterface.hh>
#include <memory>
#include <string>
namespace cta {
namespace xrootPlugins {
/**
* Server-side class implementing the listing of one or more archive files.
*/
class ListArchiveFilesCmd {
public:
/**
* Constructor.
*
* @param log The object representing the API of the CTA logging system.
* @param xrdSfsFileError The error member-variable of the XrdSfsFile class.
* @param displayHeader Set to true if the user requested the header to be
* displayed.
* @param searchCriteria The collection of criteria used to select a set of
* tape files.
* @param catalogue The CTA catalogue.
*/
ListArchiveFilesCmd(
log::Logger &log,
XrdOucErrInfo &xrdSfsFileError,
const bool displayHeader,
const catalogue::TapeFileSearchCriteria &searchCriteria,
catalogue::Catalogue &catalogue);
/**
* Indirectly implements the XrdSfsFile::read() method.
*/
XrdSfsXferSize read(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize size);
protected:
/**
* The object representing the API of the CTA logging system.
*/
log::Logger &m_log;
/**
* The error member-variable of the XrdSfsFile class.
*/
XrdOucErrInfo &m_xrdSfsFileError;
/**
* Set to true if the user requested the columns header to be display.
*/
bool m_displayHeader = false;
/**
* Enumeration of the possible states of the "list archive files" command.
*/
enum State {
WAITING_FOR_FIRST_READ,
LISTING_ARCHIVE_FILES,
LISTED_LAST_ARCHIVE_FILE};
/**
* Returns the string representation of the specified state.
* @param state The state.
* @return The string representation of the state.
*/
std::string stateToStr(const State state) const;
/**
* The current state of the "list archive files" command.
*/
State m_state = WAITING_FOR_FIRST_READ;
/**
* Iterator over the archive files in the CTA catalogue that are to be listed.
*/
std::unique_ptr<catalogue::ArchiveFileItor> m_archiveFileItor;
/**
* The buffer that sits between calls to ListArchiveFilesCmd::read() and calls
* to the CTA catalogue.
*
* The XRootD server calls XrdCtaFile::read() which in turn calls
* ListArchiveFilesCmd::read() which in turn calls the CTA catalogue. Both
* read() method are byte oriented whereas the CTA catalogue is record
* oriented. This buffer ties the byte oriented read() calls to the records
* of the CTA catalogue.
*
*/
std::string m_readBuffer;
/**
* The offset in the reply file at which the current m_readBuffer starts.
*/
XrdSfsFileOffset m_fileOffsetOfReadBuffer = 0;
/**
* The file offset expected in the next call to ListArchiveFilesCmd::read();
*/
XrdSfsFileOffset m_expectedFileOffset = 0;
/**
* Refreshes the read buffer by first clearing it and then attempting to write
* into it more records from the CTA catalogue.
*/
void refreshReadBuffer();
}; // class ListArchiveFilesCmd
} // namespace xrootPlugins
} // namespace cta
......@@ -267,9 +267,9 @@ int XrdCtaFile::getMmap(void **Addr, off_t &Size) {
// read
//------------------------------------------------------------------------------
XrdSfsXferSize XrdCtaFile::read(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize size) {
if (m_listingArchiveFiles) {
if (nullptr != m_listArchiveFilesCmd.get()) {
// Temporarily treat the "cta archive ls" command the same as all the others
return readFromCmdlineOutput(offset, buffer, size);
return m_listArchiveFilesCmd->read(offset, buffer, size);
} else {
return readFromCmdlineOutput(offset, buffer, size);
}
......@@ -1622,7 +1622,9 @@ std::string XrdCtaFile::xCom_archivefile() {
checkOptions(help.str());
}
if(!summary) {
m_listingArchiveFiles = true;
const bool displayHeader = hasOption("-h", "--header");
m_listArchiveFilesCmd.reset(new ListArchiveFilesCmd(m_log, error, displayHeader, searchCriteria, *m_catalogue));
/*
std::unique_ptr<cta::catalogue::ArchiveFileItor> itor = m_catalogue->getArchiveFileItor(searchCriteria);
if(itor->hasMore()) {
std::vector<std::vector<std::string>> responseTable;
......@@ -1652,6 +1654,7 @@ std::string XrdCtaFile::xCom_archivefile() {
}
cmdlineOutput << formatResponse(responseTable, hasOption("-h", "--header"));
}
*/
}
else { //summary
cta::common::dataStructures::ArchiveFileSummary summary=m_catalogue->getTapeFileSummary(searchCriteria);
......
......@@ -23,6 +23,7 @@
#include "common/log/SyslogLogger.hpp"
#include "common/optional.hpp"
#include "scheduler/Scheduler.hpp"
#include "xroot_plugins/ListArchiveFilesCmd.hpp"
#include "XrdSfs/XrdSfsInterface.hh"
......@@ -117,9 +118,9 @@ protected:
bool m_suppressOptionalOptionsWarning;
/**
* Set to true when the current command is to list archive files.
* Points to a ListArchiveFilesCmd object if the current command is to list archive files.
*/
bool m_listingArchiveFiles = false;
std::unique_ptr<cta::xrootPlugins::ListArchiveFilesCmd> m_listArchiveFilesCmd;
/**
* Reads the command result from the m_cmdlineOutput member variable.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment