/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2003-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "disk/RadosStriperPool.hpp"
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/AtomicCounter.hpp"
#include "common/log/LogContext.hpp"
#include "common/Timer.hpp"
#include
#include
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class MigrationTaskInjector;
class DiskReadThreadPool {
public:
/**
* Constructor. The constructor creates the threads for the pool, but does not
* start them.
* @param nbThread Number of thread for reading files
* @param maxFilesReq maximal number of files we might require
* within a single request to the task injector
* @param maxBytesReq maximal number of bytes we might require
* within a single request a single request to the task injector
* @param lc log context for logging purpose
*/
DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
castor::tape::tapeserver::daemon::MigrationWatchDog & migrationWatchDog,
cta::log::LogContext lc, const std::string & xrootPrivateKeyPath,
uint16_t xrootTimeout);
/**
* Destructor.
* Simply destroys the thread, which should have been joined by the caller
* before using waitThreads()
*/
~DiskReadThreadPool();
/**
* Starts the threads which were created at construction time.
*/
void startThreads();
/**
* Waits for threads completion of all threads. Should be called before
* destructor
*/
void waitThreads();
/**
* Adds a DiskReadTask to the tape pool's queue of tasks.
* @param task pointer to the new task. The thread pool takes ownership of the
* task and will delete it after execution. Push() is not protected against races
* with finish() as the task injection is done from a single thread (the task
* injector)
*/
void push(DiskReadTask *task);
/**
* Injects as many "end" tasks as there are threads in the thread pool.
* A thread getting such an end task will exit. This method is called by the
* task injector at the end of the tape session. It is not race protected.
* See push()
*/
void finish();
/**
* Sets up the pointer to the task injector. This cannot be done at
* construction time as both task injector and read thread pool refer to
* each other. This function should be called before starting the threads.
* This is used for the feedback loop where the injector is requested to
* fetch more work by the read thread pool when the task queue of the thread
* pool starts to run low.
*/
void setTaskInjector(MigrationTaskInjector* injector){
m_injector = injector;
}
private:
/**
* When the last thread finish, we log all m_pooldStat members + message
* at the given level
* @param level
* @param message
*/
void logWithStat(int level, const std::string& message);
/**
* Get the next task to execute and if there is not enough tasks in queue,
* it will ask the TaskInjector to get more jobs.
* @return the next task to execute
*/
DiskReadTask* popAndRequestMore(cta::log::LogContext & lc);
/**
* When a thread finishm it call this function to Add its stats to one one of the
* Threadpool
* @param threadStats
*/
void addThreadStats(const DiskStats& stats);
/**
To protect addThreadStats from concurrent calls
*/
cta::threading::Mutex m_statAddingProtection;
/**
* Aggregate all threads' stats
*/
DiskStats m_pooldStat;
/**
* Measure the thread pool's lifetime
*/
cta::utils::Timer m_totalTime;
/**
* Subclass of the thread pool's worker thread.
*/
class DiskReadWorkerThread: private cta::threading::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & parent):
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc),
m_diskFileFactory(parent.m_xrootPrivateKeyPath,
parent.m_xrootTimeout, parent.m_striperPool){
cta::log::LogContext::ScopedParam param(m_lc, cta::log::Param("threadID", m_threadID));
m_lc.log(cta::log::INFO,"DisReadThread created");
}
void start() { cta::threading::Thread::start(); }
void wait() { cta::threading::Thread::wait(); }
private:
void logWithStat(int level, const std::string& message);
/*
* For measuring how long are the the different steps
*/
DiskStats m_threadStat;
/** Pointer to the thread pool, allowing calls to popAndRequestMore,
* and calling finish() on the task injector when the last thread
* is finishing (thanks to the actomic counter m_parent.m_nbActiveThread) */
DiskReadThreadPool & m_parent;
/** The sequential ID of the thread, used in logs */
const int m_threadID;
/** The local copy of the log context, allowing race-free logging with context
between threads. */
cta::log::LogContext m_lc;
/** The execution thread: pops and executes tasks (potentially asking for
more) and calls task injector's finish() on exit of the last thread. */
virtual void run();
/**
* A disk file factory, that will create the proper type of file access class,
* depending on the received path
*/
cta::disk::DiskFileFactory m_diskFileFactory;
};
/** Container for the threads */
std::vector m_threads;
/** The queue of pointer to tasks to be executed. We own the tasks (they are
* deleted by the threads after execution) */
cta::threading::BlockingQueue m_tasks;
/**
* Parameter: path to xroot private key
*/
std::string m_xrootPrivateKeyPath;
/**
* Parameter: xroot timeout
*/
uint16_t m_xrootTimeout;
/**
* A pool of rados striper connections, to be shared by all threads
*/
cta::disk::RadosStriperPool m_striperPool;
/**
* Reference to the watchdog, for error reporting.
*/
castor::tape::tapeserver::daemon::MigrationWatchDog & m_watchdog;
/** The log context. This is copied on construction to prevent interferences
* between threads.
*/
cta::log::LogContext m_lc;
/** Pointer to the task injector allowing request for more work, and
* termination signaling */
MigrationTaskInjector* m_injector;
/** The maximum number of files we ask per request. This value is also used as
* a threshold (half of it, indeed) to trigger the request for more work.
* Another request for more work is also triggered when the task FIFO gets empty.*/
const uint64_t m_maxFilesReq;
/** Same as m_maxFilesReq for size per request. */
const uint64_t m_maxBytesReq;
/** An atomic (i.e. thread safe) counter of the current number of thread (they
are counted up at creation time and down at completion time) */
cta::threading::AtomicCounter m_nbActiveThread;
};
}}}}