Commit a45a1761 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for file client switching in tapeserverd.

Tapeserverd now support file://, root:// rfio:// and radosStriper:// URLs, on top of heuristic protocol guessing from the hostname:[/]path synthax.
The rados striper version is not yet fleshed out yet.
The xroot client used for this is the latest object oriented client (XrdCl::File).
parent f5fa4174
......@@ -49,4 +49,6 @@ message ForkDataTransfer {
required uint64 maxfilesbeforeflush = 21;
required uint32 diskthreadpoolsize = 22;
required uint32 rmcport = 23;
required string remotefileprotocol = 24;
}
......@@ -201,7 +201,8 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
rrp,
lc);
lc,
m_castorConf.tapeserverdRemoteFileProtocol);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,lc);
......@@ -289,7 +290,8 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
DiskReadThreadPool drtp(m_castorConf.tapeserverdDiskThreads,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
lc);
lc,
m_castorConf.tapeserverdRemoteFileProtocol);
MigrationTaskInjector mti(mm, drtp, twst, m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,lc);
......
......@@ -92,6 +92,7 @@ namespace daemon {
// Additions for tapeserverd
uint32_t tapeserverdDiskThreads;
std::string tapeserverdRemoteFileProtocol;
};
/**
* Constructor.
......
......@@ -43,7 +43,7 @@ m_nextTask(destination),m_migratedFile(file),
//------------------------------------------------------------------------------
// DiskReadTask::execute
//------------------------------------------------------------------------------
void DiskReadTask::execute(log::LogContext& lc) {
void DiskReadTask::execute(log::LogContext& lc, diskFile::diskFileFactory & fileFactory) {
using log::LogContext;
using log::Param;
......@@ -58,8 +58,9 @@ void DiskReadTask::execute(log::LogContext& lc) {
//so dont do the same mistake twice !
checkMigrationFailing();
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
if(migratingFileSize != sourceFile.size()){
std::auto_ptr<tape::diskFile::ReadFile> sourceFile(
fileFactory.createReadFile(m_migratedFile->path()));
if(migratingFileSize != sourceFile->size()){
throw castor::exception::Exception("Mismtach between size given by the client "
"and the real one");
}
......@@ -80,7 +81,7 @@ void DiskReadTask::execute(log::LogContext& lc) {
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileBlock = blockId++;
migratingFileSize -= mb->m_payload.read(sourceFile);
migratingFileSize -= mb->m_payload.read(*sourceFile);
m_stats.transferTime+=localTime.secs(utils::Timer::resetCounter);
m_stats.dataVolume += mb->m_payload.size();
......
......@@ -30,6 +30,7 @@
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/server/AtomicFlag.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/file/DiskFile.hpp"
namespace castor {
namespace tape {
......@@ -47,7 +48,7 @@ public:
tape::tapegateway::FileToMigrateStruct* file,size_t numberOfBlock,
castor::server::AtomicFlag& errorFlag);
void execute(log::LogContext& lc);
void execute(log::LogContext& lc, diskFile::diskFileFactory & fileFactory);
/**
* Return the stats of the tasks. Should be call after execute
* (otherwise, it is pointless)
......
......@@ -36,7 +36,9 @@ namespace daemon {
// DiskReadThreadPool constructor
//------------------------------------------------------------------------------
DiskReadThreadPool::DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
castor::log::LogContext lc) : m_lc(lc),m_maxFilesReq(maxFilesReq),m_maxBytesReq(maxBytesReq),m_nbActiveThread(0){
castor::log::LogContext lc, const std::string & remoteFileProtocol) :
m_diskFileFactory(remoteFileProtocol), m_lc(lc),m_maxFilesReq(maxFilesReq),
m_maxBytesReq(maxBytesReq), m_nbActiveThread(0) {
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
......@@ -151,7 +153,7 @@ void DiskReadThreadPool::DiskReadWorkerThread::run() {
task.reset( m_parent.popAndRequestMore(m_lc));
m_threadStat.waitInstructionsTime += localTime.secs(utils::Timer::resetCounter);
if (NULL!=task.get()) {
task->execute(m_lc);
task->execute(m_lc, m_parent.m_diskFileFactory);
m_threadStat += task->getTaskStats();
}
else {
......
/******************************************************************************
* DiskReadThreadPool.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/server/BlockingQueue.hpp"
#include "castor/server/Threading.hpp"
#include "castor/server/AtomicCounter.hpp"
#include "castor/log/LogContext.hpp"
#include <vector>
#include <stdint.h>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class MigrationTaskInjector;
class DiskReadThreadPool {
public:
/**
* Constructor. The constructor creates the threads for the pool, but does not
* start them.
* @param nbThread Number of thread for reading files
* @param maxFilesReq maximal number of files we might require
* within a single request to the task injectore
* @param maxBytesReq maximal number of bytes we might require
* within a single request a single request to the task injectore
* @param lc log context fpr logging purpose
*/
DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
castor::log::LogContext lc);
/**
* Destructor.
* Simply destroys the thread, which should have been joined by the caller
* before using waitThreads()
*/
~DiskReadThreadPool();
/**
* Starts the threads which were created at construction time.
*/
void startThreads();
/**
* Waits for threads completion of all threads. Should be called before
* destructor
*/
void waitThreads();
/**
* Adds a DiskReadTask to the tape pool's queue of tasks.
* @param task pointer to the new task. The thread pool takes ownership of the
* task and will delete it after execution. Push() is not protected against races
* with finish() as the task injection is done from a single thread (the task
* injector)
*/
void push(DiskReadTask *task);
/**
* Injects as many "end" tasks as there are threads in the thread pool.
* A thread getting such an end task will exit. This method is called by the
* task injector at the end of the tape session. It is not race protected.
* See push()
*/
void finish();
/**
* Sets up the pointer to the task injector. This cannot be done at
* construction time as both task injector and read thread pool refer to
* each other. This function should be called before starting the threads.
* This is used for the feedback loop where the injector is requested to
* fetch more work by the read thread pool when the task queue of the thread
* pool starts to run low.
*/
void setTaskInjector(MigrationTaskInjector* injector){
m_injector = injector;
}
private:
/**
* When the last thread finish, we log all m_pooldStat members + message
* at the given level
* @param level
* @param message
*/
void logWithStat(int level, const std::string& message);
/**
* Get the next task to execute and if there is not enough tasks in queue,
* it will ask the TaskInjector to get more jobs.
* @return the next task to execute
*/
DiskReadTask* popAndRequestMore(castor::log::LogContext & lc);
/**
* When a thread finishm it call this function to Add its stats to one one of the
* Threadpool
* @param threadStats
*/
void addThreadStats(const DiskStats& stats);
/**
To protect addThreadStats from concurrent calls
*/
castor::server::Mutex m_statAddingProtection;
/**
* Aggregate all threads' stats
*/
DiskStats m_pooldStat;
/**
* Subclass of the thread pool's worker thread.
*/
class DiskReadWorkerThread: private castor::server::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & parent):
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc) {
log::LogContext::ScopedParam param(m_lc, log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"DisReadThread created");
}
void start() { castor::server::Thread::start(); }
void wait() { castor::server::Thread::wait(); }
private:
void logWithStat(int level, const std::string& message);
/*
* For measuring how long are the the different steps
*/
DiskStats m_threadStat;
/** Pointer to the thread pool, allowing calls to popAndRequestMore,
* and calling finish() on the task injector when the last thread
* is finishing (thanks to the actomic counter m_parent.m_nbActiveThread) */
DiskReadThreadPool & m_parent;
/** The sequential ID of the thread, used in logs */
const int m_threadID;
/** The local copy of the log context, allowing race-free logging with context
between threads. */
castor::log::LogContext m_lc;
/** The execution thread: pops and executes tasks (potentially asking for
more) and calls task injector's finish() on exit of the last thread. */
virtual void run();
};
/** Container for the threads */
std::vector<DiskReadWorkerThread *> m_threads;
/** The queue of pointer to tasks to be executed. We own the tasks (they are
* deleted by the threads after execution) */
castor::server::BlockingQueue<DiskReadTask *> m_tasks;
/** The log context. This is copied on construction to prevent interferences
* between threads.
*/
castor::log::LogContext m_lc;
/** Pointer to the task injector allowing request for more work, and
* termination signaling */
MigrationTaskInjector* m_injector;
/** The maximum number of files we ask per request. This value is also used as
* a threshold (half of it, indeed) to trigger the request for more work.
* Another request for more work is also triggered when the task FIFO gets empty.*/
const uint64_t m_maxFilesReq;
/** Same as m_maxFilesReq for size per request. */
const uint64_t m_maxBytesReq;
/** An atomic (i.e. thread safe) counter of the current number of thread (they
are counted up at creation time and down at completion time) */
castor::server::AtomicCounter<int> m_nbActiveThread;
};
}}}}
/******************************************************************************
* DiskReadThreadPool.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/server/BlockingQueue.hpp"
#include "castor/server/Threading.hpp"
#include "castor/server/AtomicCounter.hpp"
#include "castor/log/LogContext.hpp"
#include <vector>
#include <stdint.h>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class MigrationTaskInjector;
class DiskReadThreadPool {
public:
/**
* Constructor. The constructor creates the threads for the pool, but does not
* start them.
* @param nbThread Number of thread for reading files
* @param maxFilesReq maximal number of files we might require
* within a single request to the task injectore
* @param maxBytesReq maximal number of bytes we might require
* within a single request a single request to the task injectore
* @param lc log context fpr logging purpose
*/
DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
castor::log::LogContext lc, const std::string & remoteFileProtocol);
/**
* Destructor.
* Simply destroys the thread, which should have been joined by the caller
* before using waitThreads()
*/
~DiskReadThreadPool();
/**
* Starts the threads which were created at construction time.
*/
void startThreads();
/**
* Waits for threads completion of all threads. Should be called before
* destructor
*/
void waitThreads();
/**
* Adds a DiskReadTask to the tape pool's queue of tasks.
* @param task pointer to the new task. The thread pool takes ownership of the
* task and will delete it after execution. Push() is not protected against races
* with finish() as the task injection is done from a single thread (the task
* injector)
*/
void push(DiskReadTask *task);
/**
* Injects as many "end" tasks as there are threads in the thread pool.
* A thread getting such an end task will exit. This method is called by the
* task injector at the end of the tape session. It is not race protected.
* See push()
*/
void finish();
/**
* Sets up the pointer to the task injector. This cannot be done at
* construction time as both task injector and read thread pool refer to
* each other. This function should be called before starting the threads.
* This is used for the feedback loop where the injector is requested to
* fetch more work by the read thread pool when the task queue of the thread
* pool starts to run low.
*/
void setTaskInjector(MigrationTaskInjector* injector){
m_injector = injector;
}
private:
/**
* When the last thread finish, we log all m_pooldStat members + message
* at the given level
* @param level
* @param message
*/
void logWithStat(int level, const std::string& message);
/**
* Get the next task to execute and if there is not enough tasks in queue,
* it will ask the TaskInjector to get more jobs.
* @return the next task to execute
*/
DiskReadTask* popAndRequestMore(castor::log::LogContext & lc);
/**
* When a thread finishm it call this function to Add its stats to one one of the
* Threadpool
* @param threadStats
*/
void addThreadStats(const DiskStats& stats);
/**
To protect addThreadStats from concurrent calls
*/
castor::server::Mutex m_statAddingProtection;
/**
* Aggregate all threads' stats
*/
DiskStats m_pooldStat;
/**
* A disk file factory, that will create the proper type of file access class,
* depending on the received path
*/
diskFile::diskFileFactory m_diskFileFactory;
/**
* Subclass of the thread pool's worker thread.
*/
class DiskReadWorkerThread: private castor::server::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & parent):
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc) {
log::LogContext::ScopedParam param(m_lc, log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"DisReadThread created");
}
void start() { castor::server::Thread::start(); }
void wait() { castor::server::Thread::wait(); }
private:
void logWithStat(int level, const std::string& message);
/*
* For measuring how long are the the different steps
*/
DiskStats m_threadStat;
/** Pointer to the thread pool, allowing calls to popAndRequestMore,
* and calling finish() on the task injector when the last thread
* is finishing (thanks to the actomic counter m_parent.m_nbActiveThread) */
DiskReadThreadPool & m_parent;
/** The sequential ID of the thread, used in logs */
const int m_threadID;
/** The local copy of the log context, allowing race-free logging with context
between threads. */
castor::log::LogContext m_lc;
/** The execution thread: pops and executes tasks (potentially asking for
more) and calls task injector's finish() on exit of the last thread. */
virtual void run();
};
/** Container for the threads */
std::vector<DiskReadWorkerThread *> m_threads;
/** The queue of pointer to tasks to be executed. We own the tasks (they are
* deleted by the threads after execution) */
castor::server::BlockingQueue<DiskReadTask *> m_tasks;
/** The log context. This is copied on construction to prevent interferences
* between threads.
*/
castor::log::LogContext m_lc;
/** Pointer to the task injector allowing request for more work, and
* termination signaling */
MigrationTaskInjector* m_injector;
/** The maximum number of files we ask per request. This value is also used as
* a threshold (half of it, indeed) to trigger the request for more work.
* Another request for more work is also triggered when the task FIFO gets empty.*/
const uint64_t m_maxFilesReq;
/** Same as m_maxFilesReq for size per request. */
const uint64_t m_maxBytesReq;
/** An atomic (i.e. thread safe) counter of the current number of thread (they
are counted up at creation time and down at completion time) */
castor::server::AtomicCounter<int> m_nbActiveThread;
};
}}}}
......@@ -43,12 +43,14 @@ m_recallingFile(file),m_memManager(mm){
//------------------------------------------------------------------------------
// DiskWriteTask::execute
//------------------------------------------------------------------------------
bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
diskFile::diskFileFactory & fileFactory) {
using log::LogContext;
using log::Param;
utils::Timer localTime;
try{
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
std::auto_ptr<tape::diskFile::WriteFile> writeFile(
fileFactory.createWriteFile(m_recallingFile->path()));
m_stats.openingTime+=localTime.secs(utils::Timer::resetCounter);
int blockId = 0;
......@@ -70,7 +72,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
m_stats.dataVolume+=mb->m_payload.size();
mb->m_payload.write(ourFile);
mb->m_payload.write(*writeFile);
m_stats.transferTime+=localTime.secs(utils::Timer::resetCounter);
checksum = mb->m_payload.adler32(checksum);
......@@ -82,7 +84,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
//close has to be explicit, because it may throw.
//A close is done in WriteFile's destructor, but it may lead to some
//silent data loss
ourFile.close();
writeFile->close();
m_stats.closingTime +=localTime.secs(utils::Timer::resetCounter);
m_stats.filesCount++;
break;
......
......@@ -62,7 +62,8 @@ public:
* Main routine: takes each memory block in the fifo and writes it to disk
* @return true if the file has been successfully written false otherwise.
*/
virtual bool execute(RecallReportPacker& reporter,log::LogContext& lc) ;
virtual bool execute(RecallReportPacker& reporter,log::LogContext& lc,
diskFile::diskFileFactory & fileFactory) ;
/**
* Allows client code to return a reusable memory block. Should not been called
......
......@@ -37,6 +37,7 @@
namespace unitTests{
using namespace castor::tape::tapeserver::daemon;
using namespace castor::tape::tapeserver::client;
using namespace castor::tape::diskFile;
struct MockRecallReportPacker : public RecallReportPacker {
MOCK_METHOD3(reportCompletedJob,void(const FileStruct&,u_int32_t,u_int64_t));
MOCK_METHOD3(reportFailedJob, void(const FileStruct& ,const std::string&,int));
......@@ -56,6 +57,7 @@ namespace unitTests{
MockRecallReportPacker report(client,lc);
EXPECT_CALL(report,reportFailedJob(_,_,_));
RecallMemoryManager mm(10,100,lc);
diskFileFactory fileFactory("RFIO");
castor::tape::tapegateway::FileToRecallStruct file;
file.setPath("/dev/null");
......@@ -74,7 +76,7 @@ namespace unitTests{
t.pushDataBlock(mb);
t.pushDataBlock(NULL);
t.execute(report,lc);
t.execute(report,lc,fileFactory);
}
}
......@@ -34,8 +34,9 @@ namespace daemon {
// constructor
//------------------------------------------------------------------------------
DiskWriteThreadPool::DiskWriteThreadPool(int nbThread,
RecallReportPacker& report,castor::log::LogContext lc):
m_reporter(report),m_lc(lc)
RecallReportPacker& report,castor::log::LogContext lc,
const std::string & remoteFileProtocol):
m_diskFileFactory(remoteFileProtocol),m_reporter(report),m_lc(lc)
{
m_lc.pushOrReplace(castor::log::Param("threadCount", nbThread));
for(int i=0; i<nbThread; i++) {
......@@ -145,7 +146,8 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
task.reset(m_parentThreadPool.m_tasks.pop());
m_threadStat.waitInstructionsTime+=localTime.secs(utils::Timer::resetCounter);
if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc,
m_parentThreadPool.m_diskFileFactory)) {
++m_parentThreadPool.m_failedWriteCount;
ScopedParam sp(m_lc, Param("errorCount", m_parentThreadPool.m_failedWriteCount));
m_lc.log(LOG_ERR, "Task failed: counting another error for this session"</