Commit 635531eb authored by David COME's avatar David COME
Browse files

Set up loopback for MigrationTaskInjector in DiskReadThreadPool

setTaskInjector, popAndRequestMore have been created and m_maxBytesReq,m_maxFilesReq have been added to be able to call TaskInjector::requestInjection
parent d7e12c46
......@@ -25,13 +25,14 @@
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include <memory>
#include <sstream>
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
DiskReadThreadPool::DiskReadThreadPool(int nbThread,castor::log::LogContext lc) : m_lc(lc){
DiskReadThreadPool::DiskReadThreadPool(int nbThread, int m_maxFilesReq,int m_maxBytesReq,
castor::log::LogContext lc) : m_lc(lc){
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
......@@ -64,20 +65,33 @@ namespace daemon {
m_tasks.push(NULL);
}
}
DiskReadTaskInterface* DiskReadThreadPool::popAndRequestMore(){
DiskReadTaskInterface* ret=m_tasks.pop();
castor::tape::threading::TryMutexLocker locker(&m_loopBackMutex);
if(locker)
{
const int remainningTasks = m_tasks.size();
if(1==remainningTasks){
m_injector->requestInjection(m_maxFilesReq, m_maxBytesReq,true);
}else if(remainningTasks <= m_maxFilesReq/2){
m_injector->requestInjection(m_maxFilesReq, m_maxBytesReq,false);
}
}
return ret;
}
void DiskReadThreadPool::DiskReadWorkerThread::run() {
std::auto_ptr<DiskReadTaskInterface> task;
while(1) {
task.reset( _this.m_tasks.pop());
if (NULL!=task.get())
task.reset( _this.m_tasks.popAndRequestMore());
if (NULL!=task.get()) {
task->execute(lc);
else
}
else {
break;
}
}
} //end of while(1)
}
tape::threading::AtomicCounter<int> DiskReadThreadPool::DiskReadWorkerThread::m_nbActiveThread(0);
}}}}
......@@ -29,6 +29,7 @@
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/daemon/DiskThreadPoolInterface.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/log/LogContext.hpp"
#include <vector>
......@@ -36,16 +37,20 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class MigrationTaskInjector;
class DiskReadThreadPool : public DiskThreadPoolInterface<DiskReadTaskInterface> {
public:
DiskReadThreadPool(int nbThread,castor::log::LogContext lc);
DiskReadThreadPool(int nbThread, int m_maxFilesReq,int m_maxBytesReq,
castor::log::LogContext lc);
~DiskReadThreadPool();
void startThreads();
void waitThreads();
virtual void push(DiskReadTaskInterface *t);
void finish();
void setTaskInjector(MigrationTaskInjector* injector){
m_injector = injector;
}
DiskReadTaskInterface* popAndRequestMore();
private:
class DiskReadWorkerThread: private castor::tape::threading::Thread {
public:
......@@ -66,6 +71,10 @@ private:
};
std::vector<DiskReadWorkerThread *> m_threads;
castor::log::LogContext m_lc;
MigrationTaskInjector* m_injector;
int m_maxFilesReq;
int m_maxBytesReq;
tape::threading::Mutex m_loopBackMutex;
};
}}}}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment