Commit 085ebea2 authored by David COME's avatar David COME
Browse files

Whatever WIP again

parent fd041b19
/******************************************************************************
* MigrationTaskInjector.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
namespace{
castor::tape::tapegateway::FileToMigrateStruct* removeOwningList(castor::tape::tapegateway::FileToMigrateStruct* ptr){
ptr->setFilesToMigrateList(0);
return ptr;
}
}
namespace castor{
namespace tape{
namespace tapeserver{
namespace daemon {
MigrationTaskInjector::MigrationTaskInjector(MemoryManager & mm,
DiskThreadPoolInterface<DiskReadTaskInterface> & diskReader,
TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,client::ClientInterface& client,
castor::log::LogContext lc):
m_thread(*this),m_memManager(mm),m_tapeWriter(tapeWriter),
m_diskReader(diskReader),m_client(client)
{
}
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs
*/
void MigrationTaskInjector::injectBulkMigrations(const std::vector<tapegateway::FileToMigrateStruct*>& jobs){
const u_signed64 blockCapacity = m_memManager->blockCapacity();
for(const std::vector<tapegateway::FileToMigrateStruct*>::const_iterator it= jobs.begin();it!=jobs.end();++it){
const u_signed64 fileSize = (*it)->fileSize();
const u_signed64 neededBlock = fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1);
TapeWriteTask *twt = new TapeWriteTask(,neededBlock,mm);
DiskReadTask *drt = new DiskReadTask(*twt,removeOwningList((*it)->clone()));
}
}
/**
* Wait for the inner thread to finish
*/
void MigrationTaskInjector::waitThreads(){
m_thread.wait();
}
/**
* Start the inner thread
*/
void MigrationTaskInjector::startThreads(){
m_thread.start();
}
MigrationTaskInjector::WorkerThread::WorkerThread(MigrationTaskInjector & rji): _this(rji) {}
void MigrationTaskInjector::WorkerThread::run(){}
} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor
/******************************************************************************
* MigrationTaskInjector.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/client/ClientProxy.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
#include "castor/log/LogContext.hpp"
namespace castor{
namespace tape{
namespace tapeserver{
namespace daemon {
/**
* This classis responsible for creating the tasks in case of a recall job
*/
class MigrationTaskInjector: public TaskInjector {
public:
MigrationTaskInjector(MemoryManager & mm,
DiskThreadPoolInterface<DiskReadTaskInterface> & diskReader,
TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,client::ClientInterface& client,
castor::log::LogContext lc);
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs
*/
void injectBulkMigrations(const std::vector<castor::tape::tapegateway::FileToMigrateStruct*>& jobs);
/**
* Wait for the inner thread to finish
*/
void waitThreads();
/**
* Start the inner thread
*/
void startThreads();
private:
class WorkerThread: public castor::tape::threading::Thread {
public:
WorkerThread(MigrationTaskInjector & rji): _this(rji) {}
virtual void run();
private:
MigrationTaskInjector & _this;
} m_thread;
MemoryManager & m_memManager;
TapeSingleThreadInterface<TapeWriteTask>& m_tapeWriter;
DiskThreadPoolInterface<DiskReadTaskInterface>& m_diskReader;
client::ClientInterface& m_client;
/**
* utility member to log some pieces of information
*/
castor::log::LogContext m_lc;
castor::tape::threading::Mutex m_producerProtection;
castor::tape::threading::BlockingQueue<Request> m_queue;
};
} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor
......@@ -58,7 +58,7 @@ void RecallTaskInjector::injectBulkRecalls(const std::vector<castor::tape::tapeg
m_lc.log(LOG_INFO, "Logged file to recall");
DiskWriteTask * dwt = new DiskWriteTask(removeOwningList(dynamic_cast<tape::tapegateway::FileToRecallStruct*>((*it)->clone())) ,m_memManager);
DiskWriteTask * dwt = new DiskWriteTask(removeOwningList((*it)->clone()) ,m_memManager);
TapeReadFileTask * trt = new TapeReadFileTask(m_memManager,*dwt, (*it)->fseq(), blockID(**it));
m_diskWriter.push(dwt);
......
......@@ -41,34 +41,49 @@ namespace daemon {
class TapeWriteSingleThread : public TapeSingleThreadInterface<TapeWriteTaskInterface> {
public:
TapeWriteSingleThread(castor::tape::drives::DriveInterface & drive, MigrationReportPacker & repPacker,
int filesBeforeFlush, int blockBeforeFlush):
int filesBeforeFlush, int blockBeforeFlush,castor::log::LogContext lc):
TapeSingleThreadInterface<TapeWriteTaskInterface>(drive),
m_filesBeforeFlush(filesBeforeFlush),m_blocksBeforeFlush(blockBeforeFlush), m_reportPacker(repPacker) {}
m_filesBeforeFlush(filesBeforeFlush),m_blocksBeforeFlush(blockBeforeFlush),
m_drive(drive),m_reportPacker(repPacker),m_lc(lc)
{}
private:
virtual void run() {
// First we have to initialise the tape read session
std::auto_ptr<castor::tape::tapeFile::ReadSession> rs;
try {
rs.reset(new castor::tape::tapeFile::ReadSession(m_drive, m_vid));
m_lc.log(LOG_INFO, "Tape Write session session successfully started");
}
catch (castor::exception::Exception & ex) {
m_lc.log(LOG_ERR, "Failed to start tape read session");
// TODO: log and unroll the session
// TODO: add an unroll mode to the tape read task. (Similar to exec, but pushing blocks marked in error)
}
int blocks=0;
int files=0;
std::auto_ptr<TapeWriteTaskInterface> task ;
std::auto_ptr<TapeWriteTaskInterface> task ;
while(1) {
task.reset(m_tasks.pop());
if(NULL!=task.get()) {
task->execute(m_drive);
files+=1;
task->execute(*rs);
files++;
blocks+=task->blocks();
if (files >= m_filesBeforeFlush ||
blocks >= m_blocksBeforeFlush) {
printf("Flushing after %d files and %d blocks\n", files, blocks);
if (files >= m_filesBeforeFlush || blocks >= m_blocksBeforeFlush) {
m_drive.flush();
log::LogContext::ScopedParam sp0(m_lc, log::Param("files", files));
log::LogContext::ScopedParam sp1(m_lc, log::Param("blocks", blocks));
m_lc.log(LOG_INFO,"Flushing after files and blocks");
m_reportPacker.reportFlush();
files=0;
blocks=0;
m_drive.flush();
}
}
else{
printf("End of TapeWriteWorkerThread::run() (flushing)\n");
m_lc.log(LOG_INFO,"End of TapeWriteWorkerThread::run() (flushing");
m_drive.flush();
m_reportPacker.reportEndOfSession();
return;
......@@ -79,6 +94,8 @@ private:
int m_filesBeforeFlush;
int m_blocksBeforeFlush;
castor::tape::drives::DriveInterface& m_drive;
MigrationReportPacker & m_reportPacker;
castor::log::LogContext m_lc;
};
}}}}
......@@ -28,6 +28,7 @@
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
/*Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
......@@ -51,8 +52,8 @@ namespace tapeserver {
namespace daemon {
TapeWriteTask::TapeWriteTask(int fSeq, int blockCount, MemoryManager& mm):
m_fSeq(fSeq),m_memManager(mm), m_fifo(blockCount), m_blockCount(blockCount)
TapeWriteTask::TapeWriteTask(int fSeq, int blockCount, tapegateway::FileToMigrateStruct* file,MemoryManager& mm):
m_fSeq(fSeq),m_fileToMigrate(file),m_memManager(mm), m_fifo(blockCount),m_blockCount(blockCount)
{
mm.addClient(&m_fifo);
}
......@@ -67,34 +68,53 @@ namespace daemon {
}
void TapeWriteTask::execute(castor::tape::drives::DriveInterface & td,castor::log::LogContext& lc) {
void TapeWriteTask::execute(castor::tape::tapeFile::WriteSession & session,castor::log::LogContext& lc) {
using castor::log::LogContext;
using castor::log::Param;
std::auto_ptr<castor::tape::tapeFile::WriteFile> output;
try{
output.reset(new tape::tapeFile::WriteFile(&session, *m_fileToMigrate));
lc.log(LOG_DEBUG, "Successfully opened the tape file for writing");
}catch(const castor::exception::Exception & ex){
lc.log(LOG_ERR, "Failed to open tape file for writing");
throw;
}
int blockId = 0;
while(!m_fifo.finished()) {
MemBlock* const mb = m_fifo.popDataBlock();
if(/*m_migratingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| */blockId != mb->m_fileBlock || mb->m_failed ){
LogContext::ScopedParam sp[]={
//LogContext::ScopedParam(lc, Param("expected_NSFILEID",m_recallingFile->fileid())),
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->m_failed))
};
tape::utils::suppresUnusedVariable(sp);
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
try {
while(!m_fifo.finished()) {
MemBlock* const mb = m_fifo.popDataBlock();
AutoReleaseBlock releaser(mb,m_memManager);
if(/*m_migratingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
* || */blockId != mb->m_fileBlock || mb->m_failed ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->m_failed))
};
tape::utils::suppresUnusedVariable(sp);
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
}
mb->m_payload.write(output);
++blockId;
}
//mb->m_payload.write(td);
m_memManager.releaseBlock(mb);
++blockId;
}
}
catch(const castor::tape::Exception& e){
lc.log(LOG_ERR,"Circulating blocks into TapeWriteTask::execute");
while(!m_fifo.finished()) {
MemBlock* const mb = m_fifo.popDataBlock();
if(!mb->m_failed){
lc.log(LOG_ERR,"Expecting a failed Memblock, did not get one");
}
m_memManager.releaseBlock(mb);
}
}
}
MemBlock * TapeWriteTask::getFreeBlock() {
return m_fifo.getFreeBlock();
}
......
......@@ -28,6 +28,7 @@
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/log/LogContext.hpp"
namespace castor {
......@@ -49,7 +50,7 @@ public:
* @param blockCount: number of memory blocks (TODO:?)
* @param mm: reference to the memory manager in use
*/
TapeWriteTask(int fSeq, int blockCount, MemoryManager& mm);
TapeWriteTask(int fSeq, int blockCount, tape::tapegateway::FileToMigrateStruct* file,MemoryManager& mm);
/**
......@@ -66,7 +67,7 @@ public:
* Main execution routine
* @param td: tape drive object which will handle the file
*/
virtual void execute(castor::tape::drives::DriveInterface & td,castor::log::LogContext& lc);
virtual void execute(castor::tape::tapeFile::WriteSession & session,castor::log::LogContext& lc);
/**
* Used to reclaim used memory blocks
......@@ -86,14 +87,14 @@ public:
virtual ~TapeWriteTask();
private:
// std::auto_ptr<tapegateway::FileToMigrateStruct> m_migratingFile;
/**
* The file sequence number of the file to be written on tape
*/
int m_fSeq;
std::auto_ptr<tapegateway::FileToMigrateStruct> m_fileToMigrate;
/**
* reference to the memory manager in use
*/
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment