Commit 7c6822fa authored by David COME's avatar David COME
Browse files

Add interface for TapeXSingleThread

Also removed the nested Worker class to make the interface inheriting from Thread
parent ee95a6f9
......@@ -28,44 +28,26 @@
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
#include <iostream>
#include <stdio.h>
class TapeReadSingleThread {
class TapeReadSingleThread : public TapeSingleThreadInterface<TapeReadTask>{
public:
TapeReadSingleThread(castor::tape::drives::DriveInterface & drive): m_workerThread(*this), m_drive(drive) {}
void startThreads() { m_workerThread.startThreads(); }
void waitThreads() { m_workerThread.waitThreads(); }
void push(TapeReadTask * t) { m_tasks.push(t); }
void finish() { m_tasks.push(new endOfSession); }
TapeReadSingleThread(castor::tape::drives::DriveInterface & drive):
TapeSingleThreadInterface<TapeReadTask>(drive) {}
private:
class endOfSession: public TapeReadTask {
virtual bool endOfWork() { return true; }
};
class TapeReadWorkerThread : private castor::tape::threading::Thread {
public:
TapeReadWorkerThread(TapeReadSingleThread & manager): m_manager(manager) {}
void startThreads() { castor::tape::threading::Thread::start(); }
void waitThreads() {
castor::tape::threading::Thread::wait();
}
private:
TapeReadSingleThread & m_manager;
virtual void run() {
virtual void run() {
while(1) {
TapeReadTask * task = m_manager.m_tasks.pop();
TapeReadTask * task = m_tasks.pop();
bool end = task->endOfWork();
if (!end) task->execute(m_manager.m_drive);
if (!end) task->execute(m_drive);
delete task;
if (end) {
printf("End of TapeReadWorkerThread::run()\n");
return;
}
}
}
} m_workerThread;
friend class TapeReadWorkerThread;
castor::tape::drives::DriveInterface & m_drive;
castor::tape::threading::BlockingQueue<TapeReadTask *> m_tasks;
}
};
/*
* File: TapeSingleThreadInterface.hpp
* Author: dcome
*
* Created on March 18, 2014, 4:28 PM
*/
#ifndef TAPESINGLETHREADINTERFACE_HPP
#define TAPESINGLETHREADINTERFACE_HPP
#include "castor/tape/tapeserver/threading/Threading.hpp"
template <class Task> class TapeSingleThreadInterface : private castor::tape::threading::Thread
{
protected:
class endOfSession: public Task {
virtual bool endOfWork() { return true; }
};
castor::tape::threading::BlockingQueue<Task *> m_tasks;
castor::tape::drives::DriveInterface & m_drive;
public:
void finish() { m_tasks.push(new endOfSession); }
void push(Task * t) { m_tasks.push(t); }
virtual void startThreads(){ start(); }
virtual void waitThreads() { wait(); }
TapeSingleThreadInterface(castor::tape::drives::DriveInterface & drive):
m_drive(drive)
{}
};
#endif /* TAPESINGLETHREADINTERFACE_HPP */
......@@ -30,65 +30,48 @@
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/MigrationJob.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
#include <iostream>
#include <stdio.h>
class TapeWriteSingleThread {
class TapeWriteSingleThread : public TapeSingleThreadInterface<TapeWriteTask> {
public:
TapeWriteSingleThread(castor::tape::drives::DriveInterface & drive, MigrationReportPacker & repPacker,
int filesBeforeFlush, int blockBeforeFlush):
m_workerThread(*this), m_drive(drive), m_filesBeforeFlush(filesBeforeFlush),
m_blocksBeforeFlush(blockBeforeFlush), m_reportPacker(repPacker) {}
void startThreads() { m_workerThread.startThreads(); }
void waitThreads() { m_workerThread.waitThreads(); }
void push(TapeWriteTask * t) { m_tasks.push(t); }
void finish() { m_tasks.push(new endOfSession); }
TapeSingleThreadInterface<TapeWriteTask>(drive),
m_filesBeforeFlush(filesBeforeFlush),m_blocksBeforeFlush(blockBeforeFlush), m_reportPacker(repPacker) {}
private:
class endOfSession: public TapeWriteTask {
virtual bool endOfWork() { return true; }
};
class TapeWriteWorkerThread : private castor::tape::threading::Thread {
public:
TapeWriteWorkerThread(TapeWriteSingleThread & manager): m_manager(manager) {}
void startThreads() { start(); }
void waitThreads() {
wait();
}
private:
TapeWriteSingleThread & m_manager;
virtual void run() {
virtual void run() {
int blocks=0;
int files=0;
while(1) {
TapeWriteTask * task = m_manager.m_tasks.pop();
TapeWriteTask * task = m_tasks.pop();
bool end = task->endOfWork();
if (!end) task->execute(m_manager.m_drive);
m_manager.m_reportPacker.reportCompletedJob(MigrationJob(-1, task->fSeq(), -1));
if (!end) task->execute(m_drive);
m_reportPacker.reportCompletedJob(MigrationJob(-1, task->fSeq(), -1));
files+=task->files();
blocks+=task->blocks();
if (files >= m_manager.m_filesBeforeFlush ||
blocks >= m_manager.m_blocksBeforeFlush) {
if (files >= m_filesBeforeFlush ||
blocks >= m_blocksBeforeFlush) {
printf("Flushing after %d files and %d blocks\n", files, blocks);
m_manager.m_reportPacker.reportFlush();
m_reportPacker.reportFlush();
files=0;
blocks=0;
m_manager.m_drive.flush();
m_drive.flush();
}
delete task;
if (end) {
printf("End of TapeWriteWorkerThread::run() (flushing)\n");
m_manager.m_drive.flush();
m_manager.m_reportPacker.reportEndOfSession();
m_drive.flush();
m_reportPacker.reportEndOfSession();
return;
}
}
}
} m_workerThread;
friend class TapeWriteWorkerThread;
castor::tape::drives::DriveInterface & m_drive;
castor::tape::threading::BlockingQueue<TapeWriteTask *> m_tasks;
int m_filesBeforeFlush;
int m_blocksBeforeFlush;
MigrationReportPacker & m_reportPacker;
};
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment