Commit df9be572 authored by Victor Kotlyar's avatar Victor Kotlyar
Browse files

Ported commit cf6ad15d622d263025d40a275d9dcdff4d0b1067 from castor/master

CASTOR-5190 tapeserverd should have one disk file factory per disk
thread (instead of per thread pool)

    Moved the factory to the individual threads for disk read and
writes.
parent 27a9d90a
......@@ -40,7 +40,9 @@ DiskReadThreadPool::DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64
castor::tape::tapeserver::daemon::MigrationWatchDog & migrationWatchDog,
cta::log::LogContext lc, const std::string & remoteFileProtocol,
const std::string & xrootPrivateKeyPath, uint16_t xrootTimeout) :
m_diskFileFactory(remoteFileProtocol, xrootPrivateKeyPath, xrootTimeout),
m_remoteFileProtocol(remoteFileProtocol),
m_xrootPrivateKeyPath(xrootPrivateKeyPath),
m_xrootTimeout(xrootTimeout),
m_watchdog(migrationWatchDog),
m_lc(lc),m_maxFilesReq(maxFilesReq),
m_maxBytesReq(maxBytesReq), m_nbActiveThread(0) {
......@@ -166,7 +168,7 @@ void DiskReadThreadPool::DiskReadWorkerThread::run() {
task.reset( m_parent.popAndRequestMore(m_lc));
m_threadStat.waitInstructionsTime += localTime.secs(cta::utils::Timer::resetCounter);
if (NULL!=task.get()) {
task->execute(m_lc, m_parent.m_diskFileFactory,m_parent.m_watchdog);
task->execute(m_lc, m_diskFileFactory,m_parent.m_watchdog);
m_threadStat += task->getTaskStats();
}
else {
......
......@@ -139,19 +139,15 @@ private:
*/
cta::utils::Timer m_totalTime;
/**
* A disk file factory, that will create the proper type of file access class,
* depending on the received path
*/
diskFile::DiskFileFactory m_diskFileFactory;
/**
* Subclass of the thread pool's worker thread.
*/
class DiskReadWorkerThread: private cta::threading::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & parent):
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc) {
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc),
m_diskFileFactory(parent.m_remoteFileProtocol, parent.m_xrootPrivateKeyPath,
parent.m_xrootTimeout){
cta::log::LogContext::ScopedParam param(m_lc, cta::log::Param("threadID", m_threadID));
m_lc.log(cta::log::INFO,"DisReadThread created");
}
......@@ -179,6 +175,12 @@ private:
/** The execution thread: pops and executes tasks (potentially asking for
more) and calls task injector's finish() on exit of the last thread. */
virtual void run();
/**
* A disk file factory, that will create the proper type of file access class,
* depending on the received path
*/
diskFile::DiskFileFactory m_diskFileFactory;
};
/** Container for the threads */
......@@ -187,6 +189,21 @@ private:
/** The queue of pointer to tasks to be executed. We own the tasks (they are
* deleted by the threads after execution) */
cta::threading::BlockingQueue<DiskReadTask *> m_tasks;
/**
* Parameter selecting the disk transfer protocol
*/
std::string m_remoteFileProtocol;
/**
* Parameter: path to xroot private key
*/
std::string m_xrootPrivateKeyPath;
/**
* Parameter: xroot timeout
*/
uint16_t m_xrootTimeout;
/**
* Reference to the watchdog, for error reporting.
......
......@@ -44,7 +44,9 @@ DiskWriteThreadPool::DiskWriteThreadPool(int nbThread,
const std::string & remoteFileProtocol,
const std::string & xrootPrivateKeyPath,
uint16_t xrootTimeout):
m_diskFileFactory(remoteFileProtocol, xrootPrivateKeyPath, xrootTimeout),
m_remoteFileProtocol(remoteFileProtocol),
m_xrootPrivateKeyPath(xrootPrivateKeyPath),
m_xrootTimeout(xrootTimeout),
m_reporter(report),m_watchdog(recallWatchDog),m_lc(lc)
{
m_lc.pushOrReplace(cta::log::Param("threadCount", nbThread));
......@@ -162,7 +164,7 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_threadStat.waitInstructionsTime+=localTime.secs(cta::utils::Timer::resetCounter);
if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc,
m_parentThreadPool.m_diskFileFactory, m_parentThreadPool.m_watchdog)) {
m_diskFileFactory, m_parentThreadPool.m_watchdog)) {
++m_parentThreadPool.m_failedWriteCount;
cta::log::ScopedParamContainer logParams(m_lc);
logParams.add("errorCount", m_parentThreadPool.m_failedWriteCount);
......
......@@ -105,19 +105,16 @@ private:
/** Thread safe counter for failed tasks */
cta::threading::AtomicCounter<int> m_failedWriteCount;
/**
* A disk file factory, that will create the proper type of file access class,
* depending on the received path
*/
diskFile::DiskFileFactory m_diskFileFactory;
/**
* Private class implementing the worker threads.
*/
class DiskWriteWorkerThread: private cta::threading::Thread {
public:
DiskWriteWorkerThread(DiskWriteThreadPool & manager):
m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),m_lc(m_parentThreadPool.m_lc)
m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),
m_lc(m_parentThreadPool.m_lc),
m_diskFileFactory(manager.m_remoteFileProtocol, manager.m_xrootPrivateKeyPath,
manager.m_xrootTimeout)
{
// This thread Id will remain for the rest of the thread's lifetime (and
// also context's lifetime) so ne need for a scope.
......@@ -148,8 +145,13 @@ private:
* For logging the event
*/
cta::log::LogContext m_lc;
/**
* A disk file factory, that will create the proper type of file access class,
* depending on the received path
*/
diskFile::DiskFileFactory m_diskFileFactory;
virtual void run();
};
/**
......@@ -181,6 +183,22 @@ private:
protected:
/** The (thread safe) queue of tasks */
cta::threading::BlockingQueue<DiskWriteTask*> m_tasks;
/**
* Parameter selecting the disk transfer protocol
*/
std::string m_remoteFileProtocol;
/**
* Parameter: path to xroot private key
*/
std::string m_xrootPrivateKeyPath;
/**
* Parameter: xroot timeout
*/
uint16_t m_xrootTimeout;
private:
/**
* Aggregate all threads' stats
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment