Commit 118dcca5 authored by David COME's avatar David COME
Browse files

Remove all traces of TapeQueue and TapeThread

We are now using the matching files and classes frpm castor/tape/threading
parent b2e43eef
......@@ -19,7 +19,7 @@ target_link_libraries(tapeserverd
add_executable(tapeserver-mm tapeserver-mm.cpp)
target_link_libraries(tapeserver-mm TapeDrive Exception SCSI System Utils File castorcommon castorclient)
target_link_libraries(tapeserver-mm TapeDrive Exception SCSI System Utils File castorcommon castorclient castorTapeServerThreading)
add_library(tapeserver ClientInterface.cpp MountSession.cpp)
......
......@@ -24,7 +24,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemManagerClient.hpp"
#include "castor/tape/tapeserver/daemon/Exception.hpp"
......@@ -40,14 +40,14 @@ public:
DataFifo(int bn) throw() : m_blocksNeeded(bn), m_freeBlocksProvided(0),
m_dataBlocksPushed(0), m_dataBlocksPopped(0) {};
~DataFifo() throw() { TapeMutexLocker ml(&m_freeBlockProviderProtection); }
~DataFifo() throw() { castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection); }
/* Memory manager client interface implementation */
virtual bool provideBlock(MemBlock *mb) throw(MemException) {
bool ret;
TapeMutexLocker ml(&m_freeBlockProviderProtection);
castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection);
{
TapeMutexLocker ml(&m_countersMutex);
castor::tape::threading::MutexLocker ml(&m_countersMutex);
if (m_freeBlocksProvided >= m_blocksNeeded)
throw MemException("DataFifo overflow on free blocks");
m_freeBlocksProvided++;
......@@ -65,13 +65,13 @@ public:
void pushDataBlock(MemBlock *mb) throw(castor::exception::Exception) {
{
TapeMutexLocker ml(&m_countersMutex);
castor::tape::threading::MutexLocker ml(&m_countersMutex);
if (m_dataBlocksPushed >= m_blocksNeeded)
throw MemException("DataFifo overflow on data blocks");
}
m_dataBlocks.push(mb);
{
TapeMutexLocker ml(&m_countersMutex);
castor::tape::threading::MutexLocker ml(&m_countersMutex);
m_dataBlocksPushed++;
}
}
......@@ -79,7 +79,7 @@ public:
MemBlock * popDataBlock() throw(castor::exception::Exception) {
MemBlock *ret = m_dataBlocks.pop();
{
TapeMutexLocker ml(&m_countersMutex);
castor::tape::threading::MutexLocker ml(&m_countersMutex);
m_dataBlocksPopped++;
}
return ret;
......@@ -87,16 +87,17 @@ public:
bool finished() throw() {
// No need to lock because only one int variable is read.
//TODO : are we sure the operation is atomic ? It is plateform dependant
return m_dataBlocksPopped >= m_blocksNeeded;
}
private:
TapeMutex m_countersMutex;
TapeMutex m_freeBlockProviderProtection;
castor::tape::threading::Mutex m_countersMutex;
castor::tape::threading::Mutex m_freeBlockProviderProtection;
int m_blocksNeeded;
volatile int m_freeBlocksProvided;
volatile int m_dataBlocksPushed;
volatile int m_dataBlocksPopped;
BlockingQueue<MemBlock *> m_freeBlocks;
BlockingQueue<MemBlock *> m_dataBlocks;
castor::tape::threading::BlockingQueue<MemBlock *> m_freeBlocks;
castor::tape::threading::BlockingQueue<MemBlock *> m_dataBlocks;
};
......@@ -25,8 +25,8 @@
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/daemon/TapeThreading.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include <vector>
class DiskReadThreadPool {
......@@ -67,11 +67,11 @@ private:
class endOfSession: public DiskReadTask {
virtual bool endOfWork() { return true; }
};
class DiskReadWorkerThread: private TapeThread {
class DiskReadWorkerThread: private castor::tape::threading::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & manager): m_manager(manager) {}
void startThreads() { TapeThread::start(); }
void waitThreads() { TapeThread::wait(); }
void startThreads() { start(); }
void waitThreads() { wait(); }
private:
DiskReadThreadPool & m_manager;
virtual void run() {
......@@ -84,6 +84,6 @@ private:
}
}
};
BlockingQueue<DiskReadTask *> m_tasks;
castor::tape::threading::BlockingQueue<DiskReadTask *> m_tasks;
std::vector<DiskReadWorkerThread *> m_threads;
};
......@@ -87,19 +87,19 @@ public:
* @param mb: corresponding memory block
*/
virtual void pushDataBlock(MemBlock *mb) {
TapeMutexLocker ml(&m_producerProtection);
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.pushDataBlock(mb);
}
/**
* Function used to wait until the end of the write
*/
virtual void waitCompletion() { volatile TapeMutexLocker ml(&m_producerProtection); }
virtual void waitCompletion() { volatile castor::tape::threading::MutexLocker ml(&m_producerProtection); }
/**
* Destructor (also waiting for the end of the write operation)
*/
virtual ~DiskWriteFileTask() { volatile TapeMutexLocker ml(&m_producerProtection); }
virtual ~DiskWriteFileTask() { volatile castor::tape::threading::MutexLocker ml(&m_producerProtection); }
private:
......@@ -126,5 +126,5 @@ private:
/**
* Mutex forcing serial access to the fifo
*/
TapeMutex m_producerProtection;
castor::tape::threading::Mutex m_producerProtection;
};
......@@ -25,8 +25,8 @@
#pragma once
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/daemon/TapeThreading.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/daemon/JobInjector.hpp"
#include <vector>
......@@ -61,7 +61,7 @@ public:
}
void push(DiskWriteTask *t) {
{
TapeMutexLocker ml(&m_counterProtection);
castor::tape::threading::MutexLocker ml(&m_counterProtection);
m_filesQueued += t->files();
m_blocksQueued += t->blocks();
}
......@@ -93,7 +93,7 @@ private:
DiskWriteTask * popAndRequestMoreJobs() {
DiskWriteTask * ret = m_tasks.pop();
{
TapeMutexLocker ml(&m_counterProtection);
castor::tape::threading::MutexLocker ml(&m_counterProtection);
/* We are about to go to empty: request a last call job injection */
if(m_filesQueued == 1 && ret->files()) {
printf("In DiskWriteTask::popAndRequestMoreJobs(), requesting last call: files=%d, blocks=%d, ret->files=%d, ret->blocks=%d, maxFiles=%d, maxBlocks=%d\n",
......@@ -115,11 +115,11 @@ private:
class endOfSession: public DiskWriteTask {
virtual bool endOfWork() { return true; }
};
class DiskWriteWorkerThread: private TapeThread {
class DiskWriteWorkerThread: private castor::tape::threading::Thread {
public:
DiskWriteWorkerThread(DiskWriteThreadPool & manager): m_manager(manager) {}
void startThreads() { TapeThread::start(); }
void waitThreads() { TapeThread::wait(); }
void startThreads() { castor::tape::threading::Thread::start(); }
void waitThreads() { castor::tape::threading::Thread::wait(); }
private:
DiskWriteThreadPool & m_manager;
virtual void run() {
......@@ -135,9 +135,9 @@ private:
}
}
};
BlockingQueue<DiskWriteTask *> m_tasks;
castor::tape::threading::BlockingQueue<DiskWriteTask *> m_tasks;
std::vector<DiskWriteWorkerThread *> m_threads;
TapeMutex m_counterProtection;
castor::tape::threading::Mutex m_counterProtection;
JobInjector * m_jobInjector;
int m_filesQueued;
int m_blocksQueued;
......
......@@ -26,8 +26,8 @@
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemManagerClient.hpp"
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/daemon/TapeThreading.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/exception/Exception.hpp"
#include <iostream>
......@@ -35,7 +35,7 @@
* The memory manager is responsible for allocating memory blocks and distributing
* the free ones around to any class in need.
*/
class MemoryManager: private TapeThread {
class MemoryManager: private castor::tape::threading::Thread {
public:
/**
......@@ -61,14 +61,14 @@ public:
* Start serving clients (in the dedicated thread)
*/
void startThreads() throw(castor::exception::Exception) {
TapeThread::start();
castor::tape::threading::Thread::start();
}
/**
* Waiting for clients to finish (in the dedicated thread)
*/
void waitThreads() throw(castor::exception::Exception) {
TapeThread::wait();
castor::tape::threading::Thread::wait();
}
/**
......@@ -102,7 +102,7 @@ public:
~MemoryManager() throw() {
// Make sure the thread is finished: this should be done by the caller,
// who should have called waitThreads.
// TapeThread::wait();
// castor::tape::threading::Thread::wait();
// we expect to be called after all users are finished. Just "free"
// the memory blocks we still have.
try {
......@@ -110,7 +110,7 @@ public:
m_freeBlocks.tryPop();
}
}
catch (BlockingQueue<MemBlock>::noMore) {
catch (castor::tape::threading::noMore) {
//done
}
}
......@@ -134,13 +134,13 @@ private:
/**
* Container for the free blocks
*/
BlockingQueue<MemBlock *> m_freeBlocks;
castor::tape::threading::BlockingQueue<MemBlock *> m_freeBlocks;
/**
* The client queue: we will feed them as soon as blocks
* become free. This is done in a dedicated thread.
*/
BlockingQueue<MemoryManagerClient *> m_clientQueue;
castor::tape::threading::BlockingQueue<MemoryManagerClient *> m_clientQueue;
/**
* Thread routine: pops a client and provides him blocks until he is happy!
......
......@@ -25,7 +25,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/MockTapeGateway.hpp"
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/MigrationJob.hpp"
#include <list>
......@@ -36,24 +36,24 @@ public:
void reportCompletedJob(MigrationJob mj) {
Report rep;
rep.migrationJob = mj;
TapeMutexLocker ml(&m_producterProtection);
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep);
}
void reportFlush() {
Report rep;
rep.flush = true;
TapeMutexLocker ml(&m_producterProtection);
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep);
}
void reportEndOfSession() {
Report rep;
rep.EoSession = true;
TapeMutexLocker ml(&m_producterProtection);
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep);
}
void startThreads() { m_workerThread.start(); }
void waitThread() { m_workerThread.wait(); }
virtual ~MigrationReportPacker() { TapeMutexLocker ml(&m_producterProtection); }
virtual ~MigrationReportPacker() { castor::tape::threading::MutexLocker ml(&m_producterProtection); }
private:
class Report {
public:
......@@ -63,7 +63,7 @@ private:
bool EoSession;
MigrationJob migrationJob;
};
class WorkerThread: public TapeThread {
class WorkerThread: public castor::tape::threading::Thread {
public:
WorkerThread(MigrationReportPacker& parent): m_parent(parent) {}
void run() {
......@@ -85,7 +85,7 @@ private:
} m_workerThread;
friend class WorkerThread;
MockTapeGateway & m_tapeGateway;
BlockingQueue<Report> m_fifo;
castor::tape::threading::BlockingQueue<Report> m_fifo;
std::list<MigrationJob> m_currentReport;
TapeMutex m_producterProtection;
castor::tape::threading::Mutex m_producterProtection;
};
......@@ -42,11 +42,11 @@ public:
m_tapeReader(tapeReader), m_diskWriter(diskWriter),
m_tapeGateway(tapeGateway) {}
~RecallJobInjector() { TapeMutexLocker ml(&m_producerProtection); }
~RecallJobInjector() { castor::tape::threading::MutexLocker ml(&m_producerProtection); }
virtual void requestInjection(int maxFiles, int maxBlocks, bool lastCall) {
printf("RecallJobInjector::requestInjection()\n");
TapeMutexLocker ml(&m_producerProtection);
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request(maxFiles, maxBlocks, lastCall));
}
void waitThreads() {
......@@ -64,7 +64,7 @@ private:
m_tapeReader.push(trt);
}
}
class WorkerThread: public TapeThread {
class WorkerThread: public castor::tape::threading::Thread {
public:
WorkerThread(RecallJobInjector & rji): m_parent(rji) {}
void run() {
......@@ -91,7 +91,7 @@ private:
Request req = m_parent.m_queue.tryPop();
printf("In RecallJobInjector::WorkerThread::run(): popping extra request (lastCall=%d)\n", req.lastCall);
}
} catch (BlockingQueue<Request>::noMore) {
} catch (castor::tape::threading::noMore) {
printf("In RecallJobInjector::WorkerThread::run(): Drained the request queue. We're now empty. Finishing.\n");
}
}
......@@ -111,6 +111,6 @@ private:
TapeReadSingleThread & m_tapeReader;
DiskWriteThreadPool & m_diskWriter;
MockTapeGateway & m_tapeGateway;
TapeMutex m_producerProtection;
BlockingQueue<Request> m_queue;
castor::tape::threading::Mutex m_producerProtection;
castor::tape::threading::BlockingQueue<Request> m_queue;
};
/******************************************************************************
* TapeQueue.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/TapeThreading.hpp"
#include <queue>
template<class C>
class LinkedListQueue {
public:
LinkedListQueue(): m_first(NULL), m_last(NULL), m_size(0) {}
void push(C e) {
elem * ins = new elem(e);
if (m_last) {
m_last->next = ins;
m_last = ins;
} else {
m_first = m_last = ins;
}
m_size++;
}
C front() {
return m_first->val;
}
void pop() {
elem * old = m_first;
m_first = m_first->next;
if (!m_first) m_last = NULL;
m_size--;
delete old;
}
size_t size() { return m_size; }
private:
class elem {
public:
elem(C v): val(v), next(NULL) {}
C val;
elem * next;
};
elem * m_first;
elem * m_last;
size_t m_size;
};
template<class C>
//class TapeQueue: public LinkedListQueue<C>{};
class TapeQueue: public std::queue<C>{};
template<class C>
class BlockingQueue {
public:
BlockingQueue(): m_content(0) {}
class noMore {};
void push(C e) {
{
TapeMutexLocker ml(&m_mutex);
m_queue.push(e);
m_content++;
}
m_sem.release();
}
C pop() {
m_sem.acquire();
return popCriticalSection();
}
C tryPop() {
if (!m_sem.tryAcquire()) throw noMore();
return popCriticalSection();
}
size_t size() { return m_queue.size(); }
~BlockingQueue() {}
private:
TapeQueue<C> m_queue;
TapeSemaphore m_sem;
TapeMutex m_mutex;
int m_content;
C popCriticalSection() {
TapeMutexLocker ml(&m_mutex);
C ret = m_queue.front();
m_queue.pop();
m_content--;
return ret;
}
};
......@@ -24,9 +24,9 @@
#pragma once
#include "castor/tape/tapeserver/daemon/TapeQueue.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeThreading.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include <iostream>
#include <stdio.h>
......@@ -43,12 +43,12 @@ private:
class endOfSession: public TapeReadTask {
virtual bool endOfWork() { return true; }
};
class TapeReadWorkerThread : private TapeThread {
class TapeReadWorkerThread : private castor::tape::threading::Thread {
public:
TapeReadWorkerThread(TapeReadSingleThread & manager): m_manager(manager) {}
void startThreads() { TapeThread::start(); }
void startThreads() { castor::tape::threading::Thread::start(); }
void waitThreads() {
TapeThread::wait();
castor::tape::threading::Thread::wait();
}
private:
TapeReadSingleThread & m_manager;
......@@ -67,5 +67,5 @@ private:
} m_workerThread;
friend class TapeReadWorkerThread;
castor::tape::drives::DriveInterface & m_drive;
BlockingQueue<TapeReadTask *> m_tasks;
castor::tape::threading::BlockingQueue<TapeReadTask *> m_tasks;
};
/******************************************************************************
* TapeThreading.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/exception/Exception.hpp"
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
/* WARNING WARNING TODO: this is a quick replacement of Qt's threading primives
* in an attempt to replace Qt's primitives which might give problems.
* There is no error handling!!!!!! */
/* Simplified version of Qt's Qthread object
* We are limited here to:
* start
* run
* wait
*/
class TapeThread {
public:
void start() {
if (pthread_create(&m_thread, NULL, pthread_runner, this)) {
throw MemException("Failed to start thread");
}
}
void wait() {
pthread_join(m_thread, NULL);
}
virtual void run () = 0;
virtual ~TapeThread () {}
private:
pthread_t m_thread;
static void * pthread_runner (void * arg) {
TapeThread * _this = (TapeThread *) arg;
try {
_this->run();
} catch (std::exception & e) {
printf ("Thread finished with uncaught exception: %s\n", e.what());
}
return NULL;
}
};
class TapeMutex {
public:
TapeMutex() {
m_deleted = false;
pthread_mutex_init(&m_mutex, NULL);
//printf("Initialised a mutex, TapeMutex::this=0x%x\n", this);
}
~TapeMutex() {
pthread_mutex_destroy(&m_mutex);
//printf("Destroyed a mutex, TapeMutex::this=0x%x\n", this);
m_deleted = true;
}
void lock() {
if (m_deleted) *((volatile char*) NULL) = 0;
errnoExeptLauncher(pthread_mutex_lock(&m_mutex), &m_mutex);
}
void unlock() {
if (m_deleted) *((volatile char*) NULL) = 0;
errnoExeptLauncher(pthread_mutex_unlock(&m_mutex), &m_mutex);
}
private:
pthread_mutex_t m_mutex;
bool m_deleted;
};