Commit 4ebc3e5e authored by Eric Cano's avatar Eric Cano
Browse files

Removed interface class MemoryManagerClient

Split the memory managers into headers and implementation.
Cut the number of includes by forward declaring classes.
parent e5b1f960
...@@ -42,7 +42,9 @@ add_library(tapeserver ...@@ -42,7 +42,9 @@ add_library(tapeserver
TapeWriteTask.cpp TapeWriteTask.cpp
DiskReadTask.cpp DiskReadTask.cpp
DiskWriteTask.cpp DiskWriteTask.cpp
MigrationTaskInjector.cpp) MigrationTaskInjector.cpp
MigrationMemoryManager.cpp
RecallMemoryManager.cpp)
add_library(tapeserverdTest add_library(tapeserverdTest
......
...@@ -24,18 +24,18 @@ ...@@ -24,18 +24,18 @@
#pragma once #pragma once
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
/** /**
* Abstract class used as a base class for the disk/tape write file tasks. The data consumer * Abstract class used as a base class for the disk/tape write file tasks. The data consumer
* has two methods: "pushDataBlock" used to put in the consumer's fifo a new full memory block * has two methods: "pushDataBlock" used to put in the consumer's fifo a new full memory block
* to consume, and "getFreeBlock" used by client code two reclaim the consumed memory block. * to consume, and "getFreeBlock" used by client code two reclaim the consumed memory block.
*/ */
namespace castor { namespace castor {
namespace tape { namespace tape {
namespace tapeserver { namespace tapeserver {
namespace daemon { namespace daemon {
// Antcipated declaration to hasten compilation
class MemBlock;
class DataConsumer { class DataConsumer {
public: public:
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp" #include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp" #include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemManagerClient.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp" #include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/exception/Exception.hpp" #include "castor/exception/Exception.hpp"
...@@ -51,7 +50,7 @@ namespace daemon { ...@@ -51,7 +50,7 @@ namespace daemon {
pushDataBlock +------------------------------+ getDataBlock pushDataBlock +------------------------------+ getDataBlock
*/ */
class DataFifo : public MemoryManagerClient { class DataFifo {
public: public:
/** /**
* Constructor * Constructor
......
...@@ -8,7 +8,8 @@ ...@@ -8,7 +8,8 @@
#include "castor/tape/tapeserver/client/ClientInterface.hpp" #include "castor/tape/tapeserver/client/ClientInterface.hpp"
#include "castor/log/LogContext.hpp" #include "castor/log/LogContext.hpp"
#include "castor/log/StringLogger.hpp" #include "castor/log/StringLogger.hpp"
#include "castor/tape/tapeserver/daemon/MemManager.hpp" #include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include <gtest/gtest.h> #include <gtest/gtest.h>
namespace unitTests{ namespace unitTests{
......
...@@ -8,7 +8,8 @@ ...@@ -8,7 +8,8 @@
#include "castor/tape/tapeserver/client/ClientInterface.hpp" #include "castor/tape/tapeserver/client/ClientInterface.hpp"
#include "castor/log/LogContext.hpp" #include "castor/log/LogContext.hpp"
#include "castor/log/StringLogger.hpp" #include "castor/log/StringLogger.hpp"
#include "castor/tape/tapeserver/daemon/MemManager.hpp" #include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include <gtest/gtest.h> #include <gtest/gtest.h>
namespace unitTests{ namespace unitTests{
......
/******************************************************************************
* MigrationMemoryManager.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
MigrationMemoryManager::MigrationMemoryManager(const size_t numberOfBlocks,
const size_t blockSize, castor::log::LogContext lc)
throw(castor::exception::Exception):
m_blockCapacity(blockSize), m_totalNumberOfBlocks(0),
m_totalMemoryAllocated(0), m_blocksProvided(0),
m_blocksReturned(0), m_lc(lc)
{
for (size_t i = 0; i < numberOfBlocks; i++) {
m_freeBlocks.push(new MemBlock(i, blockSize));
m_totalNumberOfBlocks++;
m_totalMemoryAllocated += blockSize;
m_lc.pushOrReplace(log::Param("blockId", i));
m_lc.log(LOG_DEBUG, "MigrationMemoryManager Created a block");
}
m_lc.log(LOG_INFO, "MigrationMemoryManager: all blocks have been created");
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::~MigrationMemoryManager
//------------------------------------------------------------------------------
MigrationMemoryManager::~MigrationMemoryManager() throw() {
// Make sure the thread is finished: this should be done by the caller,
// who should have called waitThreads.
// castor::tape::threading::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
castor::tape::threading::BlockingQueue<MemBlock*>::valueRemainingPair ret;
do {
ret = m_freeBlocks.popGetSize();
delete ret.value;
} while (ret.remaining > 0);
m_lc.log(LOG_INFO, "MigrationMemoryManager destruction : all memory blocks have been deleted");
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::startThreads
//------------------------------------------------------------------------------
void MigrationMemoryManager::startThreads() throw(castor::exception::Exception) {
castor::tape::threading::Thread::start();
m_lc.log(LOG_INFO, "MigrationMemoryManager starting thread");
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::waitThreads
//------------------------------------------------------------------------------
void MigrationMemoryManager::waitThreads() throw(castor::exception::Exception) {
castor::tape::threading::Thread::wait();
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::addClient
//------------------------------------------------------------------------------
void MigrationMemoryManager::addClient(DataFifo* c)
throw(castor::exception::Exception) {
m_clientQueue.push(c);
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::areBlocksAllBack
//------------------------------------------------------------------------------
bool MigrationMemoryManager::areBlocksAllBack()
throw(){
return m_totalNumberOfBlocks == m_freeBlocks.size();
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::blockCapacity
//------------------------------------------------------------------------------
size_t MigrationMemoryManager::blockCapacity() {
return m_blockCapacity;
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::finish
//------------------------------------------------------------------------------
void MigrationMemoryManager::finish()
throw(castor::exception::Exception) {
addClient(NULL);
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::releaseBlock
//------------------------------------------------------------------------------
void MigrationMemoryManager::releaseBlock(MemBlock* mb)
throw(castor::exception::Exception) {
mb->reset();
m_freeBlocks.push(mb);
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
m_blocksReturned++;
}
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::run
//------------------------------------------------------------------------------
void MigrationMemoryManager::run() throw(castor::exception::Exception) {
while (true) {
DataFifo* c = m_clientQueue.pop();
// If the c is a NULL pointer, that means end of clients
if (!c) return;
// Spin on the the client. We rely on the fact that he will want
// at least one block (which is the case currently)
while (c->provideBlock(m_freeBlocks.pop())) {
castor::tape::threading::MutexLocker ml(&m_countersMutex);
m_blocksProvided++;
}
}
}
}
}
}
}
/****************************************************************************** /******************************************************************************
* MemManager.hpp * MigrationMemoryManager.hpp
* *
* This file is part of the Castor project. * This file is part of the Castor project.
* See http://castor.web.cern.ch/castor * See http://castor.web.cern.ch/castor
...@@ -24,18 +24,22 @@ ...@@ -24,18 +24,22 @@
#pragma once #pragma once
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemManagerClient.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp" #include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp" #include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/exception/Exception.hpp"
#include "castor/log/LogContext.hpp" #include "castor/log/LogContext.hpp"
#include <iostream>
namespace castor { namespace castor {
namespace exception {
// Forward declaration
class Exception;
}
namespace tape { namespace tape {
namespace tapeserver { namespace tapeserver {
namespace daemon { namespace daemon {
// Forward declaration
class TapeWriteTask;
class MemBlock;
class DataFifo;
/** /**
* The memory manager is responsible for allocating memory blocks and distributing * The memory manager is responsible for allocating memory blocks and distributing
* the free ones around to any class in need. The distribution is actively run in * the free ones around to any class in need. The distribution is actively run in
...@@ -49,98 +53,54 @@ public: ...@@ -49,98 +53,54 @@ public:
* @param numberOfBlocks: number of blocks to allocate * @param numberOfBlocks: number of blocks to allocate
* @param blockSize: size of each block * @param blockSize: size of each block
*/ */
MigrationMemoryManager(const size_t numberOfBlocks, const size_t blockSize, castor::log::LogContext lc) MigrationMemoryManager(const size_t numberOfBlocks, const size_t blockSize,
throw(castor::exception::Exception) : castor::log::LogContext lc)
m_blockCapacity(blockSize), m_totalNumberOfBlocks(0), throw(castor::exception::Exception);
m_totalMemoryAllocated(0), m_blocksProvided(0), m_blocksReturned(0),m_lc(lc) {
for (size_t i = 0; i < numberOfBlocks; i++) {
m_freeBlocks.push(new MemBlock(i, blockSize));
m_totalNumberOfBlocks++;
m_totalMemoryAllocated+=blockSize;
m_lc.pushOrReplace(log::Param("blockId",i));
m_lc.log(LOG_DEBUG,"MigrationMemoryManager Created a block");
}
m_lc.log(LOG_INFO,"MigrationMemoryManager: all blocks have been created");
}
/** /**
* *
* @return the nominal capacity of one block * @return the nominal capacity of one block
*/ */
size_t blockCapacity(){ size_t blockCapacity();
return m_blockCapacity;
}
/** /**
* Are all sheep back to the farm? * Are all sheep back to the farm?
* @return * @return
*/ */
bool areBlocksAllBack() throw() { bool areBlocksAllBack() throw();
return m_totalNumberOfBlocks==m_freeBlocks.size();
}
/** /**
* Start serving clients (in the dedicated thread) * Start serving clients (in the dedicated thread)
*/ */
void startThreads() throw(castor::exception::Exception) { void startThreads() throw(castor::exception::Exception);
castor::tape::threading::Thread::start();
m_lc.log(LOG_INFO,"MigrationMemoryManager starting thread");
}
/** /**
* Waiting for clients to finish (in the dedicated thread) * Waiting for clients to finish (in the dedicated thread)
*/ */
void waitThreads() throw(castor::exception::Exception) { void waitThreads() throw(castor::exception::Exception);
castor::tape::threading::Thread::wait();
}
/** /**
* Adds a new client in need for free memory blocks * Adds a new client in need for free memory blocks
* @param c: the new client * @param c: the new client
*/ */
void addClient(MemoryManagerClient* c) throw(castor::exception::Exception) { void addClient(DataFifo* c) throw(castor::exception::Exception);
m_clientQueue.push(c);
}
/** /**
* Takes back a block which has been released by one of the clients * Takes back a block which has been released by one of the clients
* @param mb: the pointer to the block * @param mb: the pointer to the block
*/ */
void releaseBlock(MemBlock *mb) throw(castor::exception::Exception) { void releaseBlock(MemBlock *mb) throw(castor::exception::Exception);
mb->reset();
m_freeBlocks.push(mb);
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
m_blocksReturned++;
}
}
/** /**
* Function used to specify that there are no more clients for this memory manager. * Function used to specify that there are no more clients for this memory manager.
* See the definition of endOfClients below. * See the definition of endOfClients below.
*/ */
void finish() throw(castor::exception::Exception) { void finish() throw(castor::exception::Exception);
addClient(NULL);
}
/** /**
* Destructor * Destructor
*/ */
~MigrationMemoryManager() throw() { ~MigrationMemoryManager() throw();
// Make sure the thread is finished: this should be done by the caller,
// who should have called waitThreads.
// castor::tape::threading::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
castor::tape::threading::BlockingQueue<MemBlock*>::valueRemainingPair ret;
do{
ret=m_freeBlocks.popGetSize();
delete ret.value;
}while(ret.remaining>0);
m_lc.log(LOG_INFO,"MigrationMemoryManager destruction : all memory blocks have been deleted");
}
private: private:
...@@ -180,7 +140,7 @@ private: ...@@ -180,7 +140,7 @@ private:
* The client queue: we will feed them as soon as blocks * The client queue: we will feed them as soon as blocks
* become free. This is done in a dedicated thread. * become free. This is done in a dedicated thread.
*/ */
castor::tape::threading::BlockingQueue<MemoryManagerClient *> m_clientQueue; castor::tape::threading::BlockingQueue<DataFifo *> m_clientQueue;
/** /**
* Logging purpose. Given the fact the class is threaded, the LogContext * Logging purpose. Given the fact the class is threaded, the LogContext
...@@ -191,27 +151,7 @@ private: ...@@ -191,27 +151,7 @@ private:
/** /**
* Thread routine: pops a client and provides him blocks until he is happy! * Thread routine: pops a client and provides him blocks until he is happy!
*/ */
void run() throw(castor::exception::Exception) { void run() throw(castor::exception::Exception);
while(true) {
MemoryManagerClient* c = m_clientQueue.pop();
// If the c is a NULL pointer, that means end of client
if (!c) {
return;
};
/* Spin on the the client. We rely on the fact that he will want
at least one block (which is the case currently) */
while (c->provideBlock(m_freeBlocks.pop())) {
{
castor::tape::threading::MutexLocker ml (&m_countersMutex);
m_blocksProvided++;
}
}
}
}
}; };
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#pragma once #pragma once
#include "castor/tape/tapeserver/daemon/MemManager.hpp" #include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp" #include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp" #include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp" #include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
......
/****************************************************************************** /******************************************************************************
* MemManagerClient.hpp * RecallMemoryManager.hpp
* *
* This file is part of the Castor project. * This file is part of the Castor project.
* See http://castor.web.cern.ch/castor * See http://castor.web.cern.ch/castor
...@@ -22,22 +22,71 @@ ...@@ -22,22 +22,71 @@
* @author Castor Dev team, castor-dev@cern.ch * @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/ *****************************************************************************/
#pragma once #include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp" #include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/exception/Exception.hpp"
namespace castor { namespace castor {
namespace tape { namespace tape {
namespace tapeserver { namespace tapeserver {
namespace daemon { namespace daemon {
class MemoryManagerClient { //------------------------------------------------------------------------------
public: // Constructor
virtual bool provideBlock(MemBlock */*mb*/) =0; //------------------------------------------------------------------------------
virtual ~MemoryManagerClient() throw() {} RecallMemoryManager::RecallMemoryManager(const size_t numberOfBlocks, const size_t blockSize, castor::log::LogContext& lc)
}; : m_totalNumberOfBlocks(numberOfBlocks), m_lc(lc) {
for (size_t i = 0; i < numberOfBlocks; i++) {
m_freeBlocks.push(new MemBlock(i, blockSize));
m_lc.pushOrReplace(log::Param("blockId", i));
m_lc.log(LOG_DEBUG, "RecallMemoryManager created a block");
}
m_lc.log(LOG_INFO, "RecallMemoryManager: all blocks have been created");
}
//------------------------------------------------------------------------------
// RecallMemoryManager::~RecallMemoryManager
//------------------------------------------------------------------------------
RecallMemoryManager::~RecallMemoryManager() {
// Make sure the thread is finished: this should be done by the caller,
// who should have called waitThreads.
// castor::tape::threading::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
castor::tape::threading::BlockingQueue<MemBlock*>::valueRemainingPair ret;
do {
ret = m_freeBlocks.popGetSize();
delete ret.value;
} while (ret.remaining > 0);
m_lc.log(LOG_INFO, "RecallMemoryManager destruction : all memory blocks have been deleted");
}
//------------------------------------------------------------------------------
// RecallMemoryManager::~RecallMemoryManager
//------------------------------------------------------------------------------
bool RecallMemoryManager::areBlocksAllBack() throw() {
return m_totalNumberOfBlocks == m_freeBlocks.size();
}
//------------------------------------------------------------------------------
// RecallMemoryManager::~RecallMemoryManager
//------------------------------------------------------------------------------
MemBlock* RecallMemoryManager::getFreeBlock() {
return m_freeBlocks.pop();
}
//------------------------------------------------------------------------------
// RecallMemoryManager::~RecallMemoryManager
//------------------------------------------------------------------------------
void RecallMemoryManager::releaseBlock(MemBlock* mb) {
m_lc.pushOrReplace(log::Param("blockId", mb->m_memoryBlockId));
m_lc.log(LOG_DEBUG, "RecallMemoryManager A block has been released");
mb->reset();
m_freeBlocks.push(mb);
}