Commit 29a26056 authored by David COME's avatar David COME
Browse files

Changed MemManager to RecallMemoryManager

RecallMemoryManager is the memory manager dedicated to the recall. All recall-related files are now using RecallMemoryManager
parent 5ee1eed6
......@@ -26,7 +26,7 @@
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
......@@ -43,10 +43,10 @@ namespace {
*/
class AutoReleaseBlock{
castor::tape::tapeserver::daemon::MemBlock *block;
castor::tape::tapeserver::daemon::MemoryManager& memManager;
castor::tape::tapeserver::daemon::RecallMemoryManager& memManager;
public:
AutoReleaseBlock(castor::tape::tapeserver::daemon::MemBlock* mb,
castor::tape::tapeserver::daemon::MemoryManager& mm):
castor::tape::tapeserver::daemon::RecallMemoryManager& mm):
block(mb),memManager(mm){}
~AutoReleaseBlock(){
......@@ -71,7 +71,7 @@ public:
* @param file: All we need to know about the file we are recalling
* @param mm: memory manager of the session
*/
DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,MemoryManager& mm):
DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm):
m_recallingFile(file),m_memManager(mm){
}
......@@ -171,7 +171,7 @@ private:
/**
* Reference to the Memory Manager in use
*/
MemoryManager & m_memManager;
RecallMemoryManager & m_memManager;
/**
* Mutex forcing serial access to the fifo
......
......@@ -43,7 +43,6 @@ namespace daemon {
}
}
m_tasks.push(t);
std::cout<<"size tasks "<<m_tasks.size()<<std::endl;
}
void DiskWriteThreadPool::finish() {
castor::tape::threading::MutexLocker ml(&m_counterProtection);
......@@ -121,6 +120,8 @@ namespace daemon {
//Im the last Thread alive, report end of session
if(failledWritting==0){
_this.m_reporter.reportEndOfSession();
//TODO
// _this.m_jobInjector->end();
}
else{
_this.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
......
......@@ -67,7 +67,7 @@ private:
class DiskWriteWorkerThread: private castor::tape::threading::Thread {
public:
DiskWriteWorkerThread(DiskWriteThreadPool & manager):
threadID(m_nbActiveThread++),_this(manager)
threadID(m_nbActiveThread++),_this(manager),lc(_this.m_lc)
{
log::LogContext::ScopedParam param(lc, log::Param("threadID", threadID));
lc.log(LOG_INFO,"DiskWrite Thread created");
......
......@@ -39,8 +39,7 @@ namespace unitTests{
//EXPECT_CALL(tskInjectorl,requestInjection(_,_,_)).Times(2);
EXPECT_CALL(report,reportEndOfSession()).Times(1);
MemoryManager mm(10,100);
// mm.startThreads();
RecallMemoryManager mm(10,100);
DiskWriteThreadPool dwtp(2,5,500,report,lc);
dwtp.setJobInjector(&tskInjectorl);
......@@ -51,7 +50,6 @@ namespace unitTests{
file.setBlockId3(1);
for(int i=0;i<5;++i){
DiskWriteTask* t=new DiskWriteTask(dynamic_cast<tapegateway::FileToRecallStruct*>(file.clone()),mm);
MemBlock* mb=mm.getFreeBlock();
mb->m_fileid=0;
......@@ -63,7 +61,6 @@ namespace unitTests{
dwtp.finish();
dwtp.waitThreads();
// mm.waitThreads();
}
}
......@@ -244,7 +244,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
MemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz);
RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz);
TapeReadSingleThread trst(*drive);
RecallReportPacker rrp(m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
......@@ -261,7 +261,6 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
// We got something to recall. Time to start the machinery
trst.startThreads();
mm.startThreads();
dwtp.startThreads();
rrp.startThreads();
rti.startThreads();
......@@ -272,7 +271,6 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
rti.waitThreads();
rrp.waitThread();
dwtp.waitThreads();
mm.waitThreads();
trst.waitThreads();
} else {
// Just log this was an empty mount and that's it. The memory management
......
/******************************************************************************
* RecallMemoryManager.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemManagerClient.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/exception/Exception.hpp"
#include <iostream>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* The memory manager is responsible for allocating memory blocks and distributing
* the free ones around to any class in need.
*/
class RecallMemoryManager {
public:
/**
* Constructor
* @param numberOfBlocks: number of blocks to allocate
* @param blockSize: size of each block
*/
RecallMemoryManager(const size_t numberOfBlocks, const size_t blockSize)
: m_totalNumberOfBlocks(numberOfBlocks) {
for (size_t i = 0; i < numberOfBlocks; i++) {
m_freeBlocks.push(new MemBlock(i, blockSize));
}
}
/**
* Are all sheep back to the farm?
* @return
*/
bool areBlocksAllBack() throw() {
return m_totalNumberOfBlocks==m_freeBlocks.size();
}
/**
* Takes back a block which has been released by one of the clients
* @param mb: the pointer to the block
*/
void releaseBlock(MemBlock *mb) {
mb->reset();
m_freeBlocks.push(mb);
}
MemBlock* getFreeBlock(){
return m_freeBlocks.pop();
}
/**
* Destructor
*/
~RecallMemoryManager() {
// Make sure the thread is finished: this should be done by the caller,
// who should have called waitThreads.
// castor::tape::threading::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
try {
while(true) {
delete m_freeBlocks.tryPop();
}
}
catch (castor::tape::threading::noMore) {
//done
}
}
private:
/**
* Total number of allocated memory blocks
*/
size_t m_totalNumberOfBlocks;
/**
* Container for the free blocks
*/
castor::tape::threading::BlockingQueue<MemBlock*> m_freeBlocks;
};
}}}}
......@@ -21,7 +21,7 @@ namespace tape{
namespace tapeserver{
namespace daemon {
RecallTaskInjector::RecallTaskInjector(MemoryManager & mm,
RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTask> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,
client::ClientInterface& client,castor::log::LogContext lc) :
......
......@@ -48,7 +48,7 @@ namespace daemon {
class RecallTaskInjector: public TaskInjector {
public:
RecallTaskInjector(MemoryManager & mm,
RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTask> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,client::ClientInterface& client,
castor::log::LogContext lc);
......@@ -124,7 +124,7 @@ private:
RecallTaskInjector & _this;
} m_thread;
MemoryManager & m_memManager;
RecallMemoryManager & m_memManager;
TapeSingleThreadInterface<TapeReadTask> & m_tapeReader;
......
......@@ -56,7 +56,7 @@ public:
};
TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNominal) {
MemoryManager mm(50U,50U);
RecallMemoryManager mm(50U,50U);
const int nbCalls=2;
castor::log::StringLogger log("castor_tape_tapeserver_daemon_RecallTaskInjectorTest");
castor::log::LogContext lc(log);
......@@ -100,7 +100,7 @@ TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNominal) {
}
}
TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNoFiles) {
MemoryManager mm(50U,50U);
RecallMemoryManager mm(50U,50U);
castor::log::StringLogger log("castor_tape_tapeserver_daemon_RecallTaskInjectorTest");
castor::log::LogContext lc(log);
......
......@@ -26,7 +26,7 @@
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
......@@ -36,7 +36,7 @@ namespace tapeserver {
namespace daemon {
class TapeReadFileTask: public TapeReadTask {
public:
TapeReadFileTask(MemoryManager & source,DataConsumer & destination, int fSeq, int blockCount): m_fSeq(fSeq),
TapeReadFileTask(RecallMemoryManager & source,DataConsumer & destination, int fSeq, int blockCount): m_fSeq(fSeq),
m_blockCount(blockCount), m_memManager(source),m_diskWriteTask(destination) {}
/* Implementation of the TapeReadTask interface*/
virtual bool endOfWork() { return false; }
......@@ -53,7 +53,7 @@ public:
private:
int m_fSeq;
int m_blockCount;
MemoryManager & m_memManager;
RecallMemoryManager & m_memManager;
DataConsumer & m_diskWriteTask;
};
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment