/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2003-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#pragma once
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/AtomicCounter.hpp"
#include "common/log/LogContext.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "disk/RadosStriperPool.hpp"
#include "common/Timer.hpp"
#include
#define __STDC_FORMAT_MACROS
#include
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Container for the threads that will execute the disk writes tasks in the
* migration.
*/
class DiskWriteThreadPool {
public:
/**
* Constructor: we create the thread structures here, but they do not get
* started yet.
* @param nbThread Fixed number of threads in the pool
* @param reportPacker Reference to a previously created recall
* report packer, to which the tasks will report their results.
* @param lc reference to a log context object that will be copied at
* construction time (and then copied further for each thread). There will
* be no side effect on the caller's logs.
* @param xrootPrivateKeyPath the path to the xroot private key file.
*/
DiskWriteThreadPool(int nbThread,
RecallReportPacker& reportPacker,
RecallWatchDog& recallWatchDog,
cta::log::LogContext lc,
const std::string & xrootPrivateKeyPath,
uint16_t xrootTimeout);
/**
* Destructor: we suppose the threads are no running (waitThreads() should
* be called befor destruction unless the threads were not started.
*/
virtual ~DiskWriteThreadPool();
/**
* Starts the thread created at construction time.
*/
void startThreads();
/**
* Waits for completion of all the pool's threads.
*/
void waitThreads();
/**
* Pushes a pointer to a task. The thread pool owns the task and will
* de-allocate it.
* @param t pointer to the task
*/
void push(DiskWriteTask *t);
/**
* Signals to the thread pool that there will be no more tasks pushed to it,
* and that the threads can therefore complete.
*/
void finish();
private:
/** Running counter active threads, used to determine which thread is the last. */
cta::threading::AtomicCounter m_nbActiveThread;
/** Thread safe counter for failed tasks */
cta::threading::AtomicCounter m_failedWriteCount;
/**
* Private class implementing the worker threads.
*/
class DiskWriteWorkerThread: private cta::threading::Thread {
public:
DiskWriteWorkerThread(DiskWriteThreadPool & manager):
m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),
m_lc(m_parentThreadPool.m_lc),
m_diskFileFactory(manager.m_xrootPrivateKeyPath,
manager.m_xrootTimeout, manager.m_striperPool)
{
// This thread Id will remain for the rest of the thread's lifetime (and
// also context's lifetime) so ne need for a scope.
m_lc.pushOrReplace(cta::log::Param("threadID", m_threadID));
m_lc.log(cta::log::INFO,"DiskWrite Thread created");
}
void start() { cta::threading::Thread::start(); }
void wait() { cta::threading::Thread::wait(); }
private:
void logWithStat(int level, const std::string& message);
/*
* For measuring how long are the the different steps
*/
DiskStats m_threadStat;
/**
* To identify the thread
*/
const int m_threadID;
/**
* The owning thread pool
*/
DiskWriteThreadPool & m_parentThreadPool;
/**
* For logging the event
*/
cta::log::LogContext m_lc;
/**
* A disk file factory, that will create the proper type of file access class,
* depending on the received path
*/
cta::disk::DiskFileFactory m_diskFileFactory;
virtual void run();
};
/**
* When a thread finishm it call this function to Add its stats to one one of the
* Threadpool
* @param threadStats
*/
void addThreadStats(const DiskStats& threadStats);
/**
* When the last thread finish, we log all m_pooldStat members + message
* at the given level
* @param level
* @param message
*/
void logWithStat(int level, const std::string& message);
/** The actual container for the thread objects */
std::vector m_threads;
/** Mutex protecting the pushers of new tasks from having the object deleted
* under their feet. */
cta::threading::Mutex m_pusherProtection;
/**
To protect addThreadStats from concurrent calls
*/
cta::threading::Mutex m_statAddingProtection;
protected:
/** The (thread safe) queue of tasks */
cta::threading::BlockingQueue m_tasks;
/**
* Parameter: path to xroot private key
*/
std::string m_xrootPrivateKeyPath;
/**
* Parameter: xroot timeout
*/
uint16_t m_xrootTimeout;
/**
* A pool of rados striper connections, to be shared by all threads
*/
cta::disk::RadosStriperPool m_striperPool;
private:
/**
* Aggregate all threads' stats
*/
DiskStats m_pooldStat;
/**
* Measure the thread pool's lifetime
*/
cta::utils::Timer m_totalTime;
/** Reference to the report packer where tasks report the result of their
* individual files and the end of session (for the last thread) */
RecallReportPacker& m_reporter;
/** Reference to the session watchdog, allowing reporting of errors to it.
*/
RecallWatchDog& m_watchdog;
/** logging context that will be copied by each thread for individual context */
cta::log::LogContext m_lc;
};
}}}}