Commit 02612e3e authored by David COME's avatar David COME
Browse files

Split DiskWriteTask, DiskReadThreadPool and DiskReadTask into hpp/cpp files instead of a single hpp

parent fc68b1bc
......@@ -29,7 +29,10 @@ add_executable(tapeserver-mm
RecallTaskInjector.cpp
../client/ClientProxy.cpp
MigrationReportPacker.cpp
DiskWriteThreadPool.cpp)
DiskWriteThreadPool.cpp
DiskReadThreadPool.cpp
DiskReadTask.cpp
DiskWriteTask.cpp)
target_link_libraries(tapeserver-mm
TapeDrive Exception SCSI System Utils File
......@@ -42,7 +45,10 @@ add_library(tapeserver
RecallTaskInjector.cpp
MigrationReportPacker.cpp
RecallReportPacker.cpp
DiskWriteThreadPool.cpp)
DiskWriteThreadPool.cpp
DiskReadThreadPool.cpp
DiskReadTask.cpp
DiskWriteTask.cpp)
add_library(tapeserverdTest
......
/******************************************************************************
* DiskReadTask.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/log/LogContext.hpp"
namespace{
/*Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
*/
class AutoPushBlock{
castor::tape::tapeserver::daemon::MemBlock *block;
castor::tape::tapeserver::daemon::DataConsumer& next;
public:
AutoPushBlock(castor::tape::tapeserver::daemon::MemBlock* mb,
castor::tape::tapeserver::daemon::DataConsumer& task):
block(mb),next(task){}
~AutoPushBlock(){
next.pushDataBlock(block);
}
};
}
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
DiskReadTask::DiskReadTask(DataConsumer & destination, tape::tapegateway::FileToMigrateStruct* file):
m_nextTask(destination),m_migratedFile(file) {}
void DiskReadTask::execute(log::LogContext& lc) {
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
try{
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
while(migratingFileSize>0){
blockId++;
MemBlock* const mb = m_nextTask.getFreeBlock();
AutoPushBlock push(mb,m_nextTask);
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileBlock = blockId++;
migratingFileSize -= mb->m_payload.read(sourceFile);
//we either read at full capacity (ie size=capacity) or if there different,
//it should be the end => migratingFileSize should be 0. If it not, error
if(mb->m_payload.size() != mb->m_payload.capacity() && migratingFileSize>0){
mb->m_failled=true;
throw castor::tape::Exception("Error while reading a file. Did not read at full capacity but the file is not fully read");
}
} //end of while(migratingFileSize>0)
}
catch(const castor::tape::Exception& e){
//we have to pump block anyway, mark them failed and then pass them to TapeWrite
//Otherwise they would be stuck into TapeWriteTask free block fifo
using log::LogContext;
using log::Param;
LogContext::ScopedParam sp(lc, Param("blockID",blockId));
lc.log(LOG_ERR,e.getMessageValue());
while(migratingFileSize>0) {
MemBlock * mb = m_nextTask.getFreeBlock();
mb->m_failled=true;
m_nextTask.pushDataBlock(mb);
} //end of while
} //end of catch
}
}}}}
......@@ -36,47 +36,8 @@ namespace daemon {
class DiskReadTask :public DiskReadTaskInterface {
public:
DiskReadTask(DataConsumer & destination, tape::tapegateway::FileToMigrateStruct* file):
m_nextTask(destination),m_migratedFile(file) {}
virtual void execute(log::LogContext& lc) {
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
try{
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
while(migratingFileSize>0){
blockId++;
MemBlock * mb = m_nextTask.getFreeBlock();
mb->m_fileid = m_migratedFile->fileid();
//mb->m_fileBlock = blockId;
mb->m_payload.read(sourceFile);
migratingFileSize-= mb->m_payload.size();
//we either read at full capacity (ie size=capacity) or if there different,
//it should be the end => migratingFileSize should be 0. If it not, error
if(mb->m_payload.size() != mb->m_payload.capacity() && migratingFileSize>0){
throw castor::tape::Exception("Error while reading a file. Did not read at full capacity but the file is not fully read");
}
m_nextTask.pushDataBlock(mb);
} //end of while(migratingFileSize>0)
}
catch(const castor::tape::Exception& e){
//we have to pump block anyway, mark them failed and then pass them to TapeWrite
//Otherwise they would be stuck into TapeWriteTask free block fifo
using log::LogContext;
using log::Param;
LogContext::ScopedParam sp(lc, Param("blockID",blockId));
lc.log(LOG_ERR,e.getMessageValue());
while(migratingFileSize>0) {
MemBlock * mb = m_nextTask.getFreeBlock();
mb->m_failled=true;
m_nextTask.pushDataBlock(mb);
} //end of while
} //end of catch
}
DiskReadTask(DataConsumer & destination, tape::tapegateway::FileToMigrateStruct* file);
virtual void execute(log::LogContext& lc);
private:
//TW ; tape write
DataConsumer & m_nextTask;
......
/******************************************************************************
* DiskReadThreadPool.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include <memory>
#include <sstream>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
DiskReadThreadPool::DiskReadThreadPool(int nbThread,castor::log::LogContext lc) : m_lc(lc){
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
}
}
DiskReadThreadPool::~DiskReadThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
}
void DiskReadThreadPool::startThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
}
}
void DiskReadThreadPool::waitThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->waitThreads();
}
}
void DiskReadThreadPool::push(DiskReadTaskInterface *t) {
m_tasks.push(t);
}
void DiskReadThreadPool::finish() {
/* Insert one endOfSession per thread */
for (size_t i=0; i<m_threads.size(); i++) {
m_tasks.push(NULL);
}
}
void DiskReadThreadPool::DiskReadWorkerThread::run() {
std::auto_ptr<DiskReadTaskInterface> task;
while(1) {
task.reset( _this.m_tasks.pop());
if (NULL!=task.get())
task->execute(lc);
else
break;
}
}
tape::threading::AtomicCounter<int> DiskReadThreadPool::DiskReadWorkerThread::m_nbActiveThread(0);
}}}}
\ No newline at end of file
......@@ -28,6 +28,8 @@
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/daemon/DiskThreadPoolInterface.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
#include "castor/log/LogContext.hpp"
#include <vector>
namespace castor {
......@@ -37,57 +39,30 @@ namespace daemon {
class DiskReadThreadPool : public DiskThreadPoolInterface<DiskReadTaskInterface> {
public:
DiskReadThreadPool(int nbThread,castor::log::LogContext lc) : m_lc(lc){
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
}
}
~DiskReadThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
}
void startThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
}
}
void waitThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->waitThreads();
}
}
virtual void push(DiskReadTaskInterface *t) { m_tasks.push(t); }
void finish() {
/* Insert one endOfSession per thread */
for (size_t i=0; i<m_threads.size(); i++) {
m_tasks.push(NULL);
}
}
DiskReadThreadPool(int nbThread,castor::log::LogContext lc);
~DiskReadThreadPool();
void startThreads();
void waitThreads();
virtual void push(DiskReadTaskInterface *t);
void finish();
private:
class DiskReadWorkerThread: private castor::tape::threading::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & manager): _this(manager) {}
DiskReadWorkerThread(DiskReadThreadPool & manager):
threadID(m_nbActiveThread++),_this(manager),lc(_this.m_lc) {
log::LogContext::ScopedParam param(lc, log::Param("threadID", threadID));
lc.log(LOG_INFO,"DiskWrite Thread created");
}
void startThreads() { start(); }
void waitThreads() { wait(); }
private:
static tape::threading::AtomicCounter<int> m_nbActiveThread;
const int threadID;
DiskReadThreadPool & _this;
virtual void run() {
castor::log::LogContext lc = _this.m_lc;
std::auto_ptr<DiskReadTaskInterface> task;
while(1) {
task.reset( _this.m_tasks.pop());
if (NULL!=task.get())
task->execute(lc);
else
break;
}
}
castor::log::LogContext lc;
virtual void run();
};
std::vector<DiskReadWorkerThread *> m_threads;
castor::log::LogContext m_lc;
......
/******************************************************************************
* DiskWriteTask.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
namespace {
/*Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
*/
class AutoReleaseBlock{
castor::tape::tapeserver::daemon::MemBlock *block;
castor::tape::tapeserver::daemon::RecallMemoryManager& memManager;
public:
AutoReleaseBlock(castor::tape::tapeserver::daemon::MemBlock* mb,
castor::tape::tapeserver::daemon::RecallMemoryManager& mm):
block(mb),memManager(mm){}
~AutoReleaseBlock(){
memManager.releaseBlock(block);
}
};
}
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Constructor
* @param file: All we need to know about the file we are recalling
* @param mm: memory manager of the session
*/
DiskWriteTask::DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm):
m_recallingFile(file),m_memManager(mm){
}
/**
* Main routine: takes each memory block in the fifo and writes it to disk
* @return true if the file has been successfully written false otherwise.
*/
bool DiskWriteTask::execute(ReportPackerInterface<detail::Recall>& reporter,log::LogContext& lc) {
using log::LogContext;
using log::Param;
try{
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
int blockId = 0;
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
AutoReleaseBlock releaser(mb,m_memManager);
if(m_recallingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->m_failled ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("expected_NSFILEID",m_recallingFile->fileid())),
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->m_failled))
};
tape::utils::suppresUnusedVariable(sp);
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
}
mb->m_payload.write(ourFile);
blockId++;
}
else
break;
} //end of while(1)
reporter.reportCompletedJob(*m_recallingFile);
return true;
}
catch(const castor::exception::Exception& e){
/*
*We might end up there with some blocks into m_fifo
* We need to empty it
*/
releaseAllBlock();
reporter.reportFailedJob(*m_recallingFile,e.getMessageValue(),e.code());
return false;
}
}
/**
* Allows client code to return a reusable memory block. Should not been called
* @return the pointer to the memory block that can be reused
*/
MemBlock *DiskWriteTask::getFreeBlock() {
throw castor::tape::Exception("DiskWriteTask::getFreeBlock should mot be called");
}
void DiskWriteTask::pushDataBlock(MemBlock *mb) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.push(mb);
}
DiskWriteTask::~DiskWriteTask() {
volatile castor::tape::threading::MutexLocker ml(&m_producerProtection);
}
void DiskWriteTask::releaseAllBlock(){
while(1){
MemBlock* mb=m_fifo.pop();
if(mb)
AutoReleaseBlock release(mb,m_memManager);
else
break;
}
}
}}}}
\ No newline at end of file
......@@ -24,12 +24,14 @@
#pragma once
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteTaskInterface.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include <memory>
namespace {
......@@ -37,22 +39,7 @@ namespace {
{
return (ftr.blockId0() << 24) | (ftr.blockId1() << 16) | (ftr.blockId2() << 8) | ftr.blockId3();
}
/*Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
*/
class AutoReleaseBlock{
castor::tape::tapeserver::daemon::MemBlock *block;
castor::tape::tapeserver::daemon::RecallMemoryManager& memManager;
public:
AutoReleaseBlock(castor::tape::tapeserver::daemon::MemBlock* mb,
castor::tape::tapeserver::daemon::RecallMemoryManager& mm):
block(mb),memManager(mm){}
~AutoReleaseBlock(){
memManager.releaseBlock(block);
}
};
}
namespace castor {
namespace tape {
......@@ -71,94 +58,33 @@ public:
* @param file: All we need to know about the file we are recalling
* @param mm: memory manager of the session
*/
DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm):
m_recallingFile(file),m_memManager(mm){
}
DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm);
/**
* Main routine: takes each memory block in the fifo and writes it to disk
* @return true if the file has been successfully written false otherwise.
*/
virtual bool execute(ReportPackerInterface<detail::Recall>& reporter,log::LogContext& lc) {
using log::LogContext;
using log::Param;
try{
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
int blockId = 0;
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
AutoReleaseBlock releaser(mb,m_memManager);
if(m_recallingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->m_failled ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("expected_NSFILEID",m_recallingFile->fileid())),
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->m_failled))
};
tape::utils::suppresUnusedVariable(sp);
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
}
mb->m_payload.write(ourFile);
blockId++;
}
else
break;
} //end of while(1)
reporter.reportCompletedJob(*m_recallingFile);
return true;
}
catch(const castor::exception::Exception& e){
/*
*We might end up there with some blocks into m_fifo
* We need to empty it
*/
releaseAllBlock();
reporter.reportFailedJob(*m_recallingFile,e.getMessageValue(),e.code());
return false;
}
}
virtual bool execute(ReportPackerInterface<detail::Recall>& reporter,log::LogContext& lc) ;
/**
* Allows client code to return a reusable memory block. Should not been called
* @return the pointer to the memory block that can be reused
*/
virtual MemBlock *getFreeBlock() {
throw castor::tape::Exception("DiskWriteTask::getFreeBlock should mot be called");
}
virtual MemBlock *getFreeBlock() ;
/**
* Function used to enqueue a new memory block holding data to be written to disk
* @param mb: corresponding memory block
*/
virtual void pushDataBlock(MemBlock *mb) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.push(mb);
}
virtual void pushDataBlock(MemBlock *mb);
/**
* Destructor (also waiting for the end of the write operation)
*/
virtual ~DiskWriteTask() {
volatile castor::tape::threading::MutexLocker ml(&m_producerProtection);
}
virtual ~DiskWriteTask();
private: