/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2003-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
MigrationMemoryManager::MigrationMemoryManager(const size_t numberOfBlocks,
const size_t blockSize, cta::log::LogContext lc)
:
m_blockCapacity(blockSize), m_totalNumberOfBlocks(0),
m_totalMemoryAllocated(0), m_blocksProvided(0),
m_blocksReturned(0), m_lc(lc)
{
for (size_t i = 0; i < numberOfBlocks; i++) {
m_freeBlocks.push(new MemBlock(i, blockSize));
m_totalNumberOfBlocks++;
m_totalMemoryAllocated += blockSize;
}
m_lc.log(cta::log::INFO, "MigrationMemoryManager: all blocks have been created");
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::~MigrationMemoryManager
//------------------------------------------------------------------------------
MigrationMemoryManager::~MigrationMemoryManager() throw() {
// Make sure the thread is finished: this should be done by the caller,
// who should have called waitThreads.
// castor::server::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
cta::threading::BlockingQueue::valueRemainingPair ret;
do {
ret = m_freeBlocks.popGetSize();
delete ret.value;
} while (ret.remaining > 0);
m_lc.log(cta::log::INFO, "MigrationMemoryManager destruction : all memory blocks have been deleted");
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::startThreads
//------------------------------------------------------------------------------
void MigrationMemoryManager::startThreads() {
cta::threading::Thread::start();
m_lc.log(cta::log::INFO, "MigrationMemoryManager starting thread");
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::waitThreads
//------------------------------------------------------------------------------
void MigrationMemoryManager::waitThreads() {
cta::threading::Thread::wait();
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::addClient
//------------------------------------------------------------------------------
void MigrationMemoryManager::addClient(DataPipeline* c)
{
m_clientQueue.push(c);
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::areBlocksAllBack
//------------------------------------------------------------------------------
bool MigrationMemoryManager::areBlocksAllBack()
throw(){
return m_totalNumberOfBlocks == m_freeBlocks.size();
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::blockCapacity
//------------------------------------------------------------------------------
size_t MigrationMemoryManager::blockCapacity() {
return m_blockCapacity;
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::finish
//------------------------------------------------------------------------------
void MigrationMemoryManager::finish()
{
addClient(NULL);
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::releaseBlock
//------------------------------------------------------------------------------
void MigrationMemoryManager::releaseBlock(MemBlock* mb)
{
mb->reset();
m_freeBlocks.push(mb);
{
cta::threading::MutexLocker ml(m_countersMutex);
m_blocksReturned++;
}
}
//------------------------------------------------------------------------------
// MigrationMemoryManager::run
//------------------------------------------------------------------------------
void MigrationMemoryManager::run() {
while (true) {
DataPipeline* c = m_clientQueue.pop();
// If the c is a NULL pointer, that means end of clients
if (!c) return;
// Spin on the the client. We rely on the fact that he will want
// at least one block (which is the case currently)
while (c->provideBlock(m_freeBlocks.pop())) {
cta::threading::MutexLocker ml(m_countersMutex);
m_blocksProvided++;
}
}
}
}
}
}
}