diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.cpp index c136a2c5fea018ae099258a95d1bc76ef1c256f7..cd20fcf469747f7d17d662a962710c0fb1b170a7 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.cpp @@ -40,7 +40,9 @@ DiskReadThreadPool::DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64 castor::tape::tapeserver::daemon::MigrationWatchDog & migrationWatchDog, cta::log::LogContext lc, const std::string & remoteFileProtocol, const std::string & xrootPrivateKeyPath, uint16_t xrootTimeout) : - m_diskFileFactory(remoteFileProtocol, xrootPrivateKeyPath, xrootTimeout), + m_remoteFileProtocol(remoteFileProtocol), + m_xrootPrivateKeyPath(xrootPrivateKeyPath), + m_xrootTimeout(xrootTimeout), m_watchdog(migrationWatchDog), m_lc(lc),m_maxFilesReq(maxFilesReq), m_maxBytesReq(maxBytesReq), m_nbActiveThread(0) { @@ -166,7 +168,7 @@ void DiskReadThreadPool::DiskReadWorkerThread::run() { task.reset( m_parent.popAndRequestMore(m_lc)); m_threadStat.waitInstructionsTime += localTime.secs(cta::utils::Timer::resetCounter); if (NULL!=task.get()) { - task->execute(m_lc, m_parent.m_diskFileFactory,m_parent.m_watchdog); + task->execute(m_lc, m_diskFileFactory,m_parent.m_watchdog); m_threadStat += task->getTaskStats(); } else { diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp b/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp index 511d88b22b08b4e0fcedf9b6716daa6584c1c6a1..819d0cdae371032950c6dbafb82b8f646e7d14df 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp @@ -139,19 +139,15 @@ private: */ cta::utils::Timer m_totalTime; - /** - * A disk file factory, that will create the proper type of file access class, - * depending on the received path - */ - diskFile::DiskFileFactory m_diskFileFactory; - /** * Subclass of the thread pool's worker thread. */ class DiskReadWorkerThread: private cta::threading::Thread { public: DiskReadWorkerThread(DiskReadThreadPool & parent): - m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc) { + m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc), + m_diskFileFactory(parent.m_remoteFileProtocol, parent.m_xrootPrivateKeyPath, + parent.m_xrootTimeout){ cta::log::LogContext::ScopedParam param(m_lc, cta::log::Param("threadID", m_threadID)); m_lc.log(cta::log::INFO,"DisReadThread created"); } @@ -179,6 +175,12 @@ private: /** The execution thread: pops and executes tasks (potentially asking for more) and calls task injector's finish() on exit of the last thread. */ virtual void run(); + + /** + * A disk file factory, that will create the proper type of file access class, + * depending on the received path + */ + diskFile::DiskFileFactory m_diskFileFactory; }; /** Container for the threads */ @@ -187,6 +189,21 @@ private: /** The queue of pointer to tasks to be executed. We own the tasks (they are * deleted by the threads after execution) */ cta::threading::BlockingQueue<DiskReadTask *> m_tasks; + + /** + * Parameter selecting the disk transfer protocol + */ + std::string m_remoteFileProtocol; + + /** + * Parameter: path to xroot private key + */ + std::string m_xrootPrivateKeyPath; + + /** + * Parameter: xroot timeout + */ + uint16_t m_xrootTimeout; /** * Reference to the watchdog, for error reporting. diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp index f6560cc52377069787f02bb23176302b9be4745d..479f2c140e4499f2c1682001ab70f3a583d82c74 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp @@ -44,7 +44,9 @@ DiskWriteThreadPool::DiskWriteThreadPool(int nbThread, const std::string & remoteFileProtocol, const std::string & xrootPrivateKeyPath, uint16_t xrootTimeout): - m_diskFileFactory(remoteFileProtocol, xrootPrivateKeyPath, xrootTimeout), + m_remoteFileProtocol(remoteFileProtocol), + m_xrootPrivateKeyPath(xrootPrivateKeyPath), + m_xrootTimeout(xrootTimeout), m_reporter(report),m_watchdog(recallWatchDog),m_lc(lc) { m_lc.pushOrReplace(cta::log::Param("threadCount", nbThread)); @@ -162,7 +164,7 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() { m_threadStat.waitInstructionsTime+=localTime.secs(cta::utils::Timer::resetCounter); if (NULL!=task.get()) { if(false==task->execute(m_parentThreadPool.m_reporter,m_lc, - m_parentThreadPool.m_diskFileFactory, m_parentThreadPool.m_watchdog)) { + m_diskFileFactory, m_parentThreadPool.m_watchdog)) { ++m_parentThreadPool.m_failedWriteCount; cta::log::ScopedParamContainer logParams(m_lc); logParams.add("errorCount", m_parentThreadPool.m_failedWriteCount); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp index a7d5a1fbcf078cb034c1c3052156b470464c2be3..cbc42aba562a51b2b12f862249ebfe9554d489e9 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp @@ -105,19 +105,16 @@ private: /** Thread safe counter for failed tasks */ cta::threading::AtomicCounter<int> m_failedWriteCount; - /** - * A disk file factory, that will create the proper type of file access class, - * depending on the received path - */ - diskFile::DiskFileFactory m_diskFileFactory; - /** * Private class implementing the worker threads. */ class DiskWriteWorkerThread: private cta::threading::Thread { public: DiskWriteWorkerThread(DiskWriteThreadPool & manager): - m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),m_lc(m_parentThreadPool.m_lc) + m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager), + m_lc(m_parentThreadPool.m_lc), + m_diskFileFactory(manager.m_remoteFileProtocol, manager.m_xrootPrivateKeyPath, + manager.m_xrootTimeout) { // This thread Id will remain for the rest of the thread's lifetime (and // also context's lifetime) so ne need for a scope. @@ -148,8 +145,13 @@ private: * For logging the event */ cta::log::LogContext m_lc; - - + + /** + * A disk file factory, that will create the proper type of file access class, + * depending on the received path + */ + diskFile::DiskFileFactory m_diskFileFactory; + virtual void run(); }; /** @@ -181,6 +183,22 @@ private: protected: /** The (thread safe) queue of tasks */ cta::threading::BlockingQueue<DiskWriteTask*> m_tasks; + + /** + * Parameter selecting the disk transfer protocol + */ + std::string m_remoteFileProtocol; + + /** + * Parameter: path to xroot private key + */ + std::string m_xrootPrivateKeyPath; + + /** + * Parameter: xroot timeout + */ + uint16_t m_xrootTimeout; + private: /** * Aggregate all threads' stats