Commit 7922f8c7 authored by Eric Cano's avatar Eric Cano
Browse files

Catching up with tapeserver branch.

parents 2d02f5b3 ec487757
......@@ -37,6 +37,8 @@ namespace daemon {
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
m_lc.pushOrReplace(log::Param("threadID",i));
m_lc.log(LOG_INFO, "DiskReadWorkerThread created");
}
}
DiskReadThreadPool::~DiskReadThreadPool() {
......@@ -44,12 +46,14 @@ namespace daemon {
delete m_threads.back();
m_threads.pop_back();
}
m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads have been destroyed");
}
void DiskReadThreadPool::startThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
}
m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads are started");
}
void DiskReadThreadPool::waitThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
......@@ -59,6 +63,7 @@ namespace daemon {
}
void DiskReadThreadPool::push(DiskReadTaskInterface *t) {
m_tasks.push(t);
m_lc.log(LOG_INFO, "Push a task into the DiskReadThreadPool");
}
void DiskReadThreadPool::finish() {
/* Insert one endOfSession per thread */
......@@ -69,17 +74,21 @@ namespace daemon {
DiskReadTaskInterface* DiskReadThreadPool::popAndRequestMore(){
castor::tape::threading::BlockingQueue<DiskReadTaskInterface*>::valueRemainingPair
vrp = m_tasks.popGetSize();
log::LogContext::ScopedParam sp(m_lc, log::Param("m_maxFilesReq", m_maxFilesReq));
log::LogContext::ScopedParam sp0(m_lc, log::Param("m_maxBytesReq", m_maxBytesReq));
if(0==vrp.remaining){
m_injector->requestInjection(m_maxFilesReq, m_maxBytesReq,true);
m_injector->requestInjection(true);
m_lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (with last call)");
}else if(vrp.remaining + 1 == m_maxFilesReq/2){
m_injector->requestInjection(m_maxFilesReq, m_maxBytesReq,false);
m_injector->requestInjection(false);
m_lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (without last call)");
}
return vrp.value;
}
void DiskReadThreadPool::DiskReadWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
m_lc.log(LOG_DEBUG, "Starting DiskReadWorkerThread");
m_lc.pushOrReplace(log::Param("threadID",m_threadID));
m_lc.log(LOG_DEBUG, "DiskReadWorkerThread Running");
std::auto_ptr<DiskReadTaskInterface> task;
while(1) {
task.reset( m_parent.popAndRequestMore());
......@@ -94,9 +103,9 @@ namespace daemon {
// will hence be no more requests for more. (last thread turns off the light)
if (0 == --m_parent.m_nbActiveThread) {
m_parent.m_injector->finish();
m_lc.log(LOG_DEBUG, "Signaled to task injector the end of disk read threads");
m_lc.log(LOG_INFO, "Signaled to task injector the end of disk read threads");
}
m_lc.log(LOG_DEBUG, "Finishing of DiskReadWorkerThread");
m_lc.log(LOG_INFO, "Finishing of DiskReadWorkerThread");
}
}}}}
......
......@@ -63,42 +63,7 @@ namespace daemon {
bool DiskWriteThreadPool::crossingDownFileThreshod(int filesPopped) const {
return (m_tasks.size() >= m_maxFilesReq/2) && (m_tasks.size() - filesPopped < m_maxFilesReq/2);
}
DiskWriteTaskInterface * DiskWriteThreadPool::popAndRequestMoreJobs() {
using castor::log::LogContext;
using castor::log::Param;
DiskWriteTaskInterface * ret = m_tasks.pop();
// TODO: completely remove task injection in writers, move it to readers
if(ret)
{
castor::tape::threading::MutexLocker ml(&m_counterProtection);
// We are about to go to empty: request a last call job injection
if(m_tasks.size() == 1) {
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("files",m_tasks.size())),
LogContext::ScopedParam(m_lc, Param("ret->files", 1)),
LogContext::ScopedParam(m_lc, Param("maxFiles", m_maxFilesReq)),
LogContext::ScopedParam(m_lc, Param("maxBlocks", m_maxBytesReq))
};
tape::utils::suppresUnusedVariable(sp);
m_lc.log(LOG_INFO, "In DiskWriteTaskInterface::popAndRequestMoreJobs(), requesting last call");
//if we are below mid on both block and files and we are crossing a threshold
//on either files of blocks, then request more jobs
} else if ( belowMidFilesAfterPop(1) && crossingDownFileThreshod(1)) {
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("files",m_tasks.size())),
LogContext::ScopedParam(m_lc, Param("ret->files", 1)),
LogContext::ScopedParam(m_lc, Param("maxFiles", m_maxFilesReq)),
LogContext::ScopedParam(m_lc, Param("maxBlocks", m_maxBytesReq))
};
tape::utils::suppresUnusedVariable(sp);
m_lc.log(LOG_INFO, "In DiskWriteTaskInterface::popAndRequestMoreJobs(), requesting: files");
}
}
return ret;
}
void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "diskWrite"));
m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread");
......
......@@ -57,12 +57,6 @@ private:
bool belowMidFilesAfterPop(int filesPopped) const ;
bool crossingDownFileThreshod(int filesPopped) const;
/**
* Pop a task from m_tasks.
* TODO The loopBack part (AndRequestMoreJob) should move to TapeReadSingleSthread
* @return
*/
DiskWriteTaskInterface * popAndRequestMoreJobs() ;
tape::threading::AtomicCounter<int> m_nbActiveThread;
tape::threading::AtomicCounter<int> m_failedWriteCount;
......
......@@ -59,7 +59,7 @@ public:
m_totalMemoryAllocated+=blockSize;
m_lc.pushOrReplace(log::Param("blockId",i));
m_lc.log(LOG_INFO,"MigrationMemoryManager Created a block");
m_lc.log(LOG_DEBUG,"MigrationMemoryManager Created a block");
}
m_lc.log(LOG_INFO,"MigrationMemoryManager: all blocks have been created");
}
......@@ -133,14 +133,12 @@ public:
// castor::tape::threading::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
try {
while(true) {
delete m_freeBlocks.tryPop();
}
}
catch (castor::tape::threading::noMore) {
//done
}
castor::tape::threading::BlockingQueue<MemBlock*>::valueRemainingPair ret;
do{
ret=m_freeBlocks.popGetSize();
delete ret.value;
}while(ret.remaining>0);
m_lc.log(LOG_INFO,"MigrationMemoryManager destruction : all memory blocks have been deleted");
}
private:
......
......@@ -46,9 +46,10 @@ namespace daemon {
MigrationTaskInjector::MigrationTaskInjector(MigrationMemoryManager & mm,
DiskThreadPoolInterface<DiskReadTaskInterface> & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
castor::log::LogContext lc):
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc):
m_thread(*this),m_memManager(mm),m_tapeWriter(tapeWriter),
m_diskReader(diskReader),m_client(client),m_lc(lc)
m_diskReader(diskReader),m_client(client),m_lc(lc),
m_maxFiles(maxFiles), m_maxByte(byteSizeThreshold)
{
}
......@@ -101,19 +102,18 @@ namespace daemon {
m_thread.start();
}
void MigrationTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
void MigrationTaskInjector::requestInjection( bool lastCall) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
if(!m_errorFlag) {
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
m_queue.push(Request(m_maxFiles, m_maxByte, lastCall));
}
}
bool MigrationTaskInjector::synchronousInjection(uint64_t maxFiles,
uint64_t byteSizeThreshold) {
bool MigrationTaskInjector::synchronousInjection() {
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList>
filesToMigrateList(m_client.getFilesToMigrate(maxFiles,
byteSizeThreshold,reqReport));
filesToMigrateList(m_client.getFilesToMigrate(m_maxFiles,
m_maxByte,reqReport));
if(NULL == filesToMigrateList.get()) {
m_lc.log(LOG_ERR, "No files to migrate: empty mount");
return false;
......@@ -132,7 +132,7 @@ namespace daemon {
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(Param("thread", "MigrationTaskInjector"));
m_parent.m_lc.log(LOG_DEBUG, "Starting MigrationTaskInjector thread");
m_parent.m_lc.log(LOG_INFO, "Starting MigrationTaskInjector thread");
try{
while(1){
if(m_parent.m_errorFlag){
......@@ -171,7 +171,7 @@ namespace daemon {
}
} // end of while(1)
//-------------
m_parent.m_lc.log(LOG_DEBUG, "Finishing MigrationTaskInjector thread");
m_parent.m_lc.log(LOG_INFO, "Finishing MigrationTaskInjector thread");
/* We want to finish at the first lastCall we encounter.
* But even after sending finish() to m_diskWriter and to m_tapeReader,
* m_diskWriter might still want some more task (the threshold could be crossed),
......
......@@ -49,7 +49,7 @@ public:
MigrationTaskInjector(MigrationMemoryManager & mm,
DiskThreadPoolInterface<DiskReadTaskInterface> & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
castor::log::LogContext lc);
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc);
/**
......@@ -77,7 +77,7 @@ public:
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
void requestInjection(bool lastCall);
/**
* Contact the client to make sure there are really something to do
......@@ -87,7 +87,7 @@ public:
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold);
bool synchronousInjection();
/**
* Send an end token in the request queue. There should be no subsequent
......@@ -156,6 +156,12 @@ private:
castor::tape::threading::BlockingQueue<Request> m_queue;
castor::tape::threading::AtomicFlag m_errorFlag;
//maximal number of files requested. at once
const uint64_t m_maxFiles;
//maximal number of cumulated byte requested. at once
const uint64_t m_maxByte;
};
} //end namespace daemon
......
......@@ -145,13 +145,14 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
rrp,
lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy, lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,lc);
trst.setTaskInjector(&rti);
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
if (rti.synchronousInjection(m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
if (rti.synchronousInjection()) {
// We got something to recall. Time to start the machinery
trst.startThreads();
dwtp.startThreads();
......@@ -211,9 +212,10 @@ void castor::tape::tapeserver::daemon::MountSession::executeWrite(LogContext & l
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
lc);
MigrationTaskInjector mti(mm, drtp, twst, m_clientProxy, lc);
if (mti.synchronousInjection(m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles)) {
MigrationTaskInjector mti(mm, drtp, twst, m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,lc);
if (mti.synchronousInjection()) {
// We have something to do: start the session by starting all the
// threads.
mm.startThreads();
......
......@@ -55,7 +55,7 @@ public:
m_freeBlocks.push(new MemBlock(i, blockSize));
m_lc.pushOrReplace(log::Param("blockId",i));
m_lc.log(LOG_INFO,"RecallMemoryManager created a block");
m_lc.log(LOG_DEBUG,"RecallMemoryManager created a block");
}
m_lc.log(LOG_INFO,"RecallMemoryManager: all blocks have been created");
}
......@@ -92,14 +92,13 @@ public:
// castor::tape::threading::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
try {
while(true) {
delete m_freeBlocks.tryPop();
}
}
catch (castor::tape::threading::noMore) {
//done
}
castor::tape::threading::BlockingQueue<MemBlock*>::valueRemainingPair ret;
do{
ret=m_freeBlocks.popGetSize();
delete ret.value;
}while(ret.remaining>0);
m_lc.log(LOG_INFO,"RecallMemoryManager destruction : all memory blocks have been deleted");
}
......
......@@ -25,10 +25,11 @@ namespace daemon {
RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTaskInterface> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,
client::ClientInterface& client,castor::log::LogContext lc) :
client::ClientInterface& client,
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc) :
m_thread(*this),m_memManager(mm),
m_tapeReader(tapeReader),m_diskWriter(diskWriter),
m_client(client),m_lc(lc)
m_client(client),m_lc(lc),m_maxFiles(maxFiles),m_byteSizeThreshold(byteSizeThreshold)
{}
void RecallTaskInjector::finish(){
......@@ -36,10 +37,10 @@ void RecallTaskInjector::finish(){
m_queue.push(Request());
}
void RecallTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
void RecallTaskInjector::requestInjection(bool lastCall) {
//@TODO where shall we acquire the lock ? There of just before the push ?
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
m_queue.push(Request(m_maxFiles, m_byteSizeThreshold, lastCall));
}
void RecallTaskInjector::waitThreads() {
......@@ -82,15 +83,15 @@ void RecallTaskInjector::injectBulkRecalls(const std::vector<castor::tape::tapeg
m_lc.log(LOG_INFO, "Tasks for recalling injected");
}
bool RecallTaskInjector::synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold)
bool RecallTaskInjector::synchronousInjection()
{
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<castor::tape::tapegateway::FilesToRecallList>
filesToRecallList(m_client.getFilesToRecall(maxFiles,byteSizeThreshold,reqReport));
filesToRecallList(m_client.getFilesToRecall(m_maxFiles,m_byteSizeThreshold,reqReport));
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("maxFiles", maxFiles)),
LogContext::ScopedParam(m_lc, Param("byteSizeThreshold",byteSizeThreshold)),
LogContext::ScopedParam(m_lc, Param("maxFiles", m_maxFiles)),
LogContext::ScopedParam(m_lc, Param("byteSizeThreshold",m_byteSizeThreshold)),
LogContext::ScopedParam(m_lc, Param("transactionId", reqReport.transactionId)),
LogContext::ScopedParam(m_lc, Param("connectDuration", reqReport.connectDuration)),
LogContext::ScopedParam(m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration))
......
......@@ -24,9 +24,9 @@
#pragma once
#include <list>
#include <stdint.h>
#include <list>
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
......@@ -51,7 +51,7 @@ public:
RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTaskInterface> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,client::ClientInterface& client,
castor::log::LogContext lc);
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc);
/**
......@@ -59,12 +59,11 @@ public:
* TapeReadSingleThread. When TapeReadSingleThread::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it is class to push a request
* in order to (try) fill up the queue.
* @param maxFiles files count requested.
* @param maxBlocks total bytes count at least requested
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
virtual void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
virtual void requestInjection(bool lastCall);
/**
* Send an end token in the request queue. There should be no subsequent
......@@ -80,7 +79,7 @@ public:
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold);
bool synchronousInjection();
/**
* Wait for the inner thread to finish
......@@ -108,14 +107,13 @@ private:
*/
class Request {
public:
Request(int mf, int mb, bool lc):
Request(uint64_t mf, uint64_t mb, bool lc):
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc),end(false) {}
Request():
nbMaxFiles(-1), byteSizeThreshold(-1), lastCall(true),end(true) {}
const int nbMaxFiles;
const int byteSizeThreshold;
nbMaxFiles(0), byteSizeThreshold(0), lastCall(true),end(true) {}
const uint64_t nbMaxFiles;
const uint64_t byteSizeThreshold;
/**
* True if it is the last call for the set of requests :it means
......@@ -149,6 +147,13 @@ private:
castor::tape::threading::Mutex m_producerProtection;
castor::tape::threading::BlockingQueue<Request> m_queue;
//maximal number of files requested. at once
const uint64_t m_maxFiles;
//maximal number of cumulated byte requested. at once
const uint64_t m_byteSizeThreshold;
};
} //end namespace daemon
......
......@@ -66,15 +66,15 @@ TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNominal) {
FakeDiskWriteThreadPool diskWrite;
FakeSingleTapeReadThread tapeRead(drive, "V12345", lc);
tapeserver::daemon::RecallReportPacker rrp(client,2,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,6,blockSize,lc);
ASSERT_EQ(true,rti.synchronousInjection(6,blockSize));
ASSERT_EQ(true,rti.synchronousInjection());
ASSERT_EQ(nbFile,diskWrite.m_tasks.size());
ASSERT_EQ(nbFile,tapeRead.m_tasks.size());
rti.startThreads();
rti.requestInjection(6,blockSize,false);
rti.requestInjection(6,blockSize,true);
rti.requestInjection(false);
rti.requestInjection(true);
rti.finish();
rti.waitThreads();
......@@ -110,9 +110,9 @@ TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNoFiles) {
FakeSingleTapeReadThread tapeRead(drive, "V12345", lc);
tapeserver::daemon::RecallReportPacker rrp(client,2,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,6,blockSize,lc);
ASSERT_EQ(false,rti.synchronousInjection(6,blockSize));
ASSERT_EQ(false,rti.synchronousInjection());
ASSERT_EQ(0U,diskWrite.m_tasks.size());
ASSERT_EQ(0U,tapeRead.m_tasks.size());
}
......
......@@ -54,11 +54,11 @@ private:
// (the remaining value is after pop)
if(vrp.remaining + 1 == m_maxFilesRequest/2) {
// This is not a last call
m_taskInjector->requestInjection(m_maxFilesRequest, 1000, false);
m_taskInjector->requestInjection(false);
} else if (0 == vrp.remaining) {
// This is a last call: if the task injector comes up empty on this
// one, he'll call it the end.
m_taskInjector->requestInjection(m_maxFilesRequest, 1000, true);
m_taskInjector->requestInjection(true);
}
return vrp.value;
}
......
......@@ -31,7 +31,7 @@ namespace daemon {
class TaskInjector{
public:
virtual void requestInjection(int maxFiles, int maxBlocks, bool lastCall) = 0;
virtual void requestInjection(bool lastCall) = 0;
virtual void finish() = 0;
virtual ~TaskInjector() {}
};
......
......@@ -15,17 +15,7 @@
namespace castor {
namespace tape {
namespace threading {
/**
* Exception class used to signal there are no more elements
*/
class noMore : public castor::tape::Exception
{
public:
noMore():Exception("")
{}
};
/***
* This simple class provides a thread-safe blocking queue
*
......@@ -56,22 +46,19 @@ public:
m_sem.acquire();
return popCriticalSection();
}
/**
* Atomically pop the element of the top of the pile AND return it with the
* number of remaining elements in the queue
* @return a struct holding the popped element (into ret.value) and the number of elements
* remaining (into ret.remaining)
*
*/
valueRemainingPair popGetSize () {
m_sem.acquire();
valueRemainingPair ret;
ret.value = popCriticalSection(&ret.remaining);
return ret;
}
/**
* Return the next value of the queue and remove it
* if there are no elements, throws a no noMoreElements exception
*/
C tryPop() {
if (!m_sem.tryAcquire()) throw noMore();
return popCriticalSection();
}
///return the number of elements currently in the queue
size_t size() const {
......
......@@ -137,10 +137,5 @@
{
helgrind-unitTest-castor-basicStringslength
Helgrind:Race
fun: _ZNKSs6*
}
{
helgrind-unitTest-castor-basicStringsMutate
Helgrind:Race
fun: _ZNSs9_M_mutateEjjj
fun:_ZNKSs6*
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment