Commit 2e905cd4 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

First memory and threading draft

parent a194d3a1
......@@ -17,6 +17,10 @@ target_link_libraries(tapeserverd
castorclient
castortapelegacymsg)
add_executable(tapeserver-mm tapeserver-mm.cpp)
target_link_libraries(tapeserver-mm TapeDrive Exception SCSI System Utils File castorcommon castorclient)
add_library(tapeserver ClientInterface.cpp MountSession.cpp)
add_library(tapeserverdTest
......
/******************************************************************************
* DataConsumer.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
/**
* Abstract class used as a base class for the disk/tape write file tasks. The data consumer
* has two methods: "pushDataBlock" used to put in the consumer's fifo a new full memory block
* to consume, and "getFreeBlock" used by client code two reclaim the consumed memory block.
*/
class DataConsumer {
public:
/**
* Returns used (consumed) memory blocks.
* @return the memory block to be reclaimed
*/
virtual MemBlock * getFreeBlock() = 0;
/**
* Inserts a new memory block in the consumers fifo.
* @param mb memory block to be inserted in the consumer fifo and consumed
*/
virtual void pushDataBlock(MemBlock *mb) = 0;
/**
* Destructor
*/
virtual ~DataConsumer() {}
};
/******************************************************************************
* DataFifo.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemManagerClient.hpp"
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/exception/Exception.hpp"
/* A fixed payload FIFO: at creation time, we know how many blocks will go through the FIFO.
The provide block method return true as long as it still needs more block. False when last
block is provided (and throws an exception after that).
*/
class DataFifo : public MemoryManagerClient {
public:
DataFifo(int bn) throw() : m_blocksNeeded(bn), m_freeBlocksProvided(0),
m_dataBlocksPushed(0), m_dataBlocksPopped(0) {};
~DataFifo() throw() { TapeMutexLocker ml(&m_freeBlockProviderProtection); }
/* Memory manager client interface implementation */
virtual bool provideBlock(MemBlock *mb) throw(MemException) {
bool ret;
TapeMutexLocker ml(&m_freeBlockProviderProtection);
{
TapeMutexLocker ml(&m_countersMutex);
if (m_freeBlocksProvided >= m_blocksNeeded)
throw MemException("DataFifo overflow on free blocks");
m_freeBlocksProvided++;
ret = m_freeBlocksProvided < m_blocksNeeded;
}
m_freeBlocks.push(mb);
return ret;
}
virtual bool endOfWork() throw() { return false; }
/* Rest of the data Fifo interface. */
MemBlock * getFreeBlock() throw(castor::exception::Exception) {
return m_freeBlocks.pop();
}
void pushDataBlock(MemBlock *mb) throw(castor::exception::Exception) {
{
TapeMutexLocker ml(&m_countersMutex);
if (m_dataBlocksPushed >= m_blocksNeeded)
throw MemException("DataFifo overflow on data blocks");
}
m_dataBlocks.push(mb);
{
TapeMutexLocker ml(&m_countersMutex);
m_dataBlocksPushed++;
}
}
MemBlock * popDataBlock() throw(castor::exception::Exception) {
MemBlock *ret = m_dataBlocks.pop();
{
TapeMutexLocker ml(&m_countersMutex);
m_dataBlocksPopped++;
}
return ret;
}
bool finished() throw() {
// No need to lock because only one int variable is read.
return m_dataBlocksPopped >= m_blocksNeeded;
}
private:
TapeMutex m_countersMutex;
TapeMutex m_freeBlockProviderProtection;
int m_blocksNeeded;
volatile int m_freeBlocksProvided;
volatile int m_dataBlocksPushed;
volatile int m_dataBlocksPopped;
BlockingQueue<MemBlock *> m_freeBlocks;
BlockingQueue<MemBlock *> m_dataBlocks;
};
/******************************************************************************
* DiskReadFileTask.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
class DiskReadFileTask: public DiskReadTask {
public:
DiskReadFileTask(DataConsumer & destination, int fileId, int nbBlocks): m_fileId(fileId),
m_nbBlocks(nbBlocks), m_fifo(destination) {}
/* Implementation of the DiskReadTask interface*/
virtual bool endOfWork() { return false; }
virtual void execute() {
for (int blockId=0; blockId < m_nbBlocks; blockId++) {
MemBlock * mb = m_fifo.getFreeBlock();
mb->m_fileid = m_fileId;
mb->m_fileBlock = blockId;
m_fifo.pushDataBlock(mb);
}
}
private:
int m_fileId;
int m_nbBlocks;
DataConsumer & m_fifo;
};
/******************************************************************************
* DiskReadTask.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
class DiskReadTask {
public:
virtual bool endOfWork() = 0;
virtual void execute() {
throw MemException("Tring to execute a non-execuatble DiskReadTask");
};
virtual ~DiskReadTask() {}
};
/******************************************************************************
* DiskReadThreadPool.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/daemon/TapeThreading.hpp"
#include <vector>
class DiskReadThreadPool {
public:
DiskReadThreadPool(int nbThread) {
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
}
}
~DiskReadThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
}
void startThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
}
}
void waitThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->waitThreads();
}
}
void push(DiskReadTask *t) { m_tasks.push(t); }
void finish() {
/* Insert one endOfSession per thread */
for (size_t i=0; i<m_threads.size(); i++) {
m_tasks.push(new endOfSession);
}
}
private:
class endOfSession: public DiskReadTask {
virtual bool endOfWork() { return true; }
};
class DiskReadWorkerThread: private TapeThread {
public:
DiskReadWorkerThread(DiskReadThreadPool & manager): m_manager(manager) {}
void startThreads() { TapeThread::start(); }
void waitThreads() { TapeThread::wait(); }
private:
DiskReadThreadPool & m_manager;
virtual void run() {
while(1) {
DiskReadTask * task = m_manager.m_tasks.pop();
bool end = task->endOfWork();
if (!end) task->execute();
delete task;
if (end) return;
}
}
};
BlockingQueue<DiskReadTask *> m_tasks;
std::vector<DiskReadWorkerThread *> m_threads;
};
/******************************************************************************
* DiskWriteFileTask.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
/**
* The DiskWriteFileTask is responsible to write a single file onto disk as part of a recall
* session. Being a consumer of memory blocks, it inherits from the DataConsumer class. It also
* inherits several methods from the DiskWriteTask (TODO: do we really need this base class?).
*/
class DiskWriteFileTask: public DiskWriteTask, public DataConsumer {
public:
/**
* Constructor
* @param fileId: file id of the file to write to disk
* @param blockCount: number of memory blocks that will be used
* @param mm: memory manager of the session
*/
DiskWriteFileTask(int fileId, int blockCount, MemoryManager& mm): m_fifo(blockCount),
m_blockCount(blockCount), m_fileId(fileId),
m_memManager(mm) { mm.addClient(&m_fifo); }
/**
* TODO: Do we need this here?
* @return always false
*/
virtual bool endOfWork() { return false; }
/**
* Return the numebr of files to write to disk
* @return always 1
*/
virtual int files() { return 1; };
/**
* @return the number of memory blocks to be used
*/
virtual int blocks() { return m_blockCount; }
/**
* Main routine: takes each memory block in the fifo and writes it to disk
*/
virtual void execute() {
int blockId = 0;
while(!m_fifo.finished()) {
//printf("+++ In disk write file, id=%d\n",m_fileId);
MemBlock *mb = m_fifo.popDataBlock();
mb->m_fileid = m_fileId;
mb->m_fileBlock = blockId++;
}
}
/**
* Allows client code to return a reusable memory block
* @return the pointer to the memory block that can be reused
*/
virtual MemBlock *getFreeBlock() { return m_fifo.getFreeBlock(); }
/**
* Function used to enqueue a new memory block holding data to be written to disk
* @param mb: corresponding memory block
*/
virtual void pushDataBlock(MemBlock *mb) {
TapeMutexLocker ml(&m_producerProtection);
m_fifo.pushDataBlock(mb);
}
/**
* Function used to wait until the end of the write
*/
virtual void waitCompletion() { volatile TapeMutexLocker ml(&m_producerProtection); }
/**
* Destructor (also waiting for the end of the write operation)
*/
virtual ~DiskWriteFileTask() { volatile TapeMutexLocker ml(&m_producerProtection); }
private:
/**
* The fifo containing the memory blocks holding data to be written to disk
*/
DataFifo m_fifo;
/**
* Number of blocks in the fifo
*/
int m_blockCount;
/**
* File id of the file that will be written to disk
*/
int m_fileId;
/**
* Reference to the Memory Manager in use
*/
MemoryManager & m_memManager;
/**
* Mutex forcing serial access to the fifo
*/
TapeMutex m_producerProtection;
};
/******************************************************************************
* DiskWriteTask.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
/**
* Abstract class describing the interface for a task that wants to write to disk.
* This is inherited exclusively by DiskWriteFileTask.
*/
class DiskWriteTask {
public:
/**
* TODO: see comment on the same function in DiskWriteFileTask.
*/
virtual bool endOfWork() = 0;
/**
* @return the number of memory blocks to be used
*/
virtual int blocks() { return 0; }
/**
* @return the number of files to write to disk
*/
virtual int files() { return 0; }
/**
* Main routine of the task
*/
virtual void execute() {
throw MemException("Trying to execute a non-executable DiskWriteTask");
};
/**
* Wait for the end of the task
*/
virtual void waitCompletion() {};
/**
* Destructor
*/
virtual ~DiskWriteTask() {};
};
/******************************************************************************
* DiskWriteThreadPool.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/daemon/TapeThreading.hpp"
#include "castor/tape/tapeserver/daemon/JobInjector.hpp"
#include <vector>
class DiskWriteThreadPool {
public:
DiskWriteThreadPool(int nbThread, int maxFilesReq, int maxBlocksReq):
m_jobInjector(NULL), m_filesQueued(0), m_blocksQueued(0),
m_maxFilesReq(maxFilesReq), m_maxBlocksReq(maxBlocksReq)
{
for(int i=0; i<nbThread; i++) {
DiskWriteWorkerThread * thr = new DiskWriteWorkerThread(*this);
m_threads.push_back(thr);
}
}
~DiskWriteThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
}
void startThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
}
}
void waitThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->waitThreads();
}