Commit ded702c6 authored by Eric Cano's avatar Eric Cano
Browse files

Catch-up with parallel development.

parents 0c1de64e f66d12a2
......@@ -27,7 +27,7 @@
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemManagerClient.hpp"
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/exception/Exception.hpp"
namespace castor {
......@@ -48,13 +48,13 @@ public:
~DataFifo() throw() { castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection); }
/* Memory manager client interface implementation */
virtual bool provideBlock(MemBlock *mb) throw(MemException) {
virtual bool provideBlock(MemBlock *mb) {
bool ret;
castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection);
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
if (m_freeBlocksProvided >= m_blocksNeeded)
throw MemException("DataFifo overflow on free blocks");
throw castor::tape::exceptions::MemException("DataFifo overflow on free blocks");
m_freeBlocksProvided++;
ret = m_freeBlocksProvided < m_blocksNeeded;
}
......@@ -71,7 +71,7 @@ public:
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
if (m_dataBlocksPushed >= m_blocksNeeded)
throw MemException("DataFifo overflow on data blocks");
throw castor::tape::exceptions::MemException("DataFifo overflow on data blocks");
}
m_dataBlocks.push(mb);
{
......
/******************************************************************************
* Exception.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include <string>
#include <errno.h>
#include <string.h>
#include <stdio.h>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class MemException: public castor::tape::Exception {
public:
MemException(const std::string & what): Exception(what) {}
virtual ~MemException() throw() {}
};
}
}
}
}
......@@ -24,7 +24,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include <memory>
#include "castor/tape/tapeserver/daemon/Payload.hpp"
......@@ -39,26 +39,35 @@ namespace daemon {
*/
class MemBlock {
public:
static const int uninitialised_value = -1;
/**
* COnstrucor
* @param id the block ID for its whole life
* @param capacity the capacity (in byte) of the embed payload
*/
MemBlock(const int id, const size_t capacity) :
m_memoryBlockId(id),m_payload(capacity){
reset();
}
/**
* Mark this block as failed ie
* m_failed is true, m_fileBlock and m_tapeFileBlock are set at -1
* Other members do not change
*/
void markAsFailed(){
m_failed = true;
m_fileBlock = -1;
m_tapeFileBlock = -1;
}
/**
* Reset the values of all the
* Reset all the members.
* Numerical ones are set at -1.and m_failed to false.
*/
void reset() throw() {
m_fileid = uninitialised_value;
m_fileBlock = uninitialised_value;
m_fSeq = uninitialised_value;
m_tapeFileBlock = uninitialised_value;
m_fileid = -1;
m_fileBlock = -1;
m_fSeq = -1;
m_tapeFileBlock = -1;
m_failed=false;
}
/** Unique memory block id */
......
......@@ -25,7 +25,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/exception/Exception.hpp"
namespace castor {
......
......@@ -43,7 +43,7 @@ public:
Payload(size_t capacity):
m_payload(new (std::nothrow) unsigned char[capacity]),m_totalCapacity(capacity),m_size(0) {
if(NULL == m_payload) {
throw MemException("Failed to allocate memory for a new MemBlock!");
throw castor::tape::exceptions::MemException("Failed to allocate memory for a new MemBlock!");
}
}
~Payload(){
......@@ -96,7 +96,7 @@ public:
err << "Trying to read a tape file block with too little space left: BlockSize="
<< from.getBlockSize() << " remainingFreeSpace=" << remainingFreeSpace()
<< " (totalSize=" << m_totalCapacity << ")";
throw MemException(err.str());
throw castor::tape::exceptions::MemException(err.str());
}
size_t readSize;
try {
......
......@@ -37,16 +37,36 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* This class will execute the different tape read tasks.
*
*/
class TapeReadSingleThread : public TapeSingleThreadInterface<TapeReadTaskInterface>{
public:
/**
*
* @param drive The drive which holds all we need in order to read later data from it
* @param vid Volume ID (tape number)
* @param maxFilesRequest : the maximul number of file the task injector may
* ask to the client in a single requiest, this is used for the feedback loop
* @param lc : log context, for logging purpose
*/
TapeReadSingleThread(castor::tape::drives::DriveInterface & drive,
const std::string vid, uint64_t maxFilesRequest,
castor::log::LogContext & lc):
TapeSingleThreadInterface<TapeReadTaskInterface>(drive, vid, lc),
m_maxFilesRequest(maxFilesRequest) {}
void setTaskInjector(TaskInjector * ti) { m_taskInjector = ti; }
private:
/**
* Pop a task from its tasks and if there is not enought tasks left, it will
* ask the task injector for more
* @return m_tasks.pop();
*/
TapeReadTaskInterface * popAndRequestMoreJobs() {
castor::tape::threading::BlockingQueue<TapeReadTaskInterface *>::valueRemainingPair
vrp = m_tasks.popGetSize();
......@@ -98,6 +118,8 @@ private:
}
uint64_t m_maxFilesRequest;
castor::tape::tapeserver::daemon::TaskInjector * m_taskInjector;
};
}
}
......
......@@ -35,19 +35,37 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* This class is in charge of
*
*/
class TapeReadTask: public TapeReadTaskInterface {
public:
/**
* COnstructor
* @param ftr The file being recalled. We acquire the ownership on the pointer
* @param destination the task that will consume the memory blocks
* @param mm The memory manager to get free block
*/
TapeReadTask(castor::tape::tapegateway::FileToRecallStruct * ftr,
DataConsumer & destination, RecallMemoryManager & mm):
m_fileToRecall(ftr), m_fifo(destination), m_mm(mm) {}
/**
* @param rs the read session holding all we need to be able to read from the tape
* @param lc the log context for .. logging purpose
* The actual function that will do the job.
* The main loop is :
* Acquire a free memory block from the memory manager , fill it, push it
*/
virtual void execute(castor::tape::tapeFile::ReadSession & rs,
castor::log::LogContext & lc) {
using castor::log::Param;
typedef castor::log::LogContext::ScopedParam ScopedParam;
// Placeholder for the tape file read
// Set the common context for all the omming logs (file info)
// Set the common context for all the coming logs (file info)
ScopedParam sp0(lc, Param("NSHOSTNAME", m_fileToRecall->nshost()));
ScopedParam sp1(lc, Param("NSFILEID", m_fileToRecall->fileid()));
ScopedParam sp2(lc, Param("BlockId", castor::tape::tapeFile::BlockId::extract(*m_fileToRecall)));
......@@ -56,8 +74,11 @@ public:
// Read the file and transmit it
bool stillReading = true;
//for counting how many mem blocks have used and how many tape blocks
//(because one mem block can hold several tape blocks
int fileBlock = 0;
int tapeBlock = 0;
MemBlock* mb=NULL;
try {
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
......@@ -125,7 +146,12 @@ public:
}
private:
// Open the file and manage failure (if any)
/**
* Open the file on the tape. In case of failure, log and throw
* Copying the auto_ptr on the calling point will give us the ownership of the
* object.
* @return if successful, return an auto_ptr on the ReadFile we want
*/
std::auto_ptr<castor::tape::tapeFile::ReadFile> openReadFile(
castor::tape::tapeFile::ReadSession & rs, castor::log::LogContext & lc){
......@@ -145,8 +171,20 @@ private:
}
return rf;
}
/**
* All we need to know about the file we are recalling
*/
std::auto_ptr<castor::tape::tapegateway::FileToRecallStruct> m_fileToRecall;
/**
* The task (seen as a Y) that will consume all the blocks we read
*/
DataConsumer & m_fifo;
/**
* The MemoryManager from whom we get free memory blocks
*/
RecallMemoryManager & m_mm;
};
......
......@@ -24,7 +24,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
......@@ -36,7 +36,7 @@ class TapeReadTaskInterface {
public:
virtual void execute(castor::tape::tapeFile::ReadSession & /*rs*/,
castor::log::LogContext & /*lc*/) {
throw MemException("Tring to execute a non-execuatble TapeReadTask");
throw castor::tape::exceptions::MemException("Tring to execute a non-execuatble TapeReadTask");
};
virtual ~TapeReadTaskInterface() {}
};
......
......@@ -26,7 +26,6 @@ protected:
size_t m_filesProcessed;
std::string m_vid;
castor::log::LogContext m_logContext;
castor::tape::tapeserver::daemon::TaskInjector * m_taskInjector;
public:
void finish() { m_tasks.push(NULL); }
void push(Task * t) { m_tasks.push(t); }
......
......@@ -32,29 +32,32 @@
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
namespace {
unsigned long initAdler32Checksum() {
return adler32(0L,Z_NULL,0);
}
}
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
TapeWriteTask::TapeWriteTask(int blockCount, tapegateway::FileToMigrateStruct* file,
MigrationMemoryManager& mm,castor::tape::threading::AtomicFlag& errorFlag):
m_fileToMigrate(file),m_memManager(mm), m_fifo(blockCount),
m_blockCount(blockCount),m_errorFlag(errorFlag)
{
//register its fifo to the memory manager as a client in order to get mem block
mm.addClient(&m_fifo);
}
//------------------------------------------------------------------------------
// fileSize
//------------------------------------------------------------------------------
int TapeWriteTask::fileSize() {
return m_fileToMigrate->fileSize();
}
//------------------------------------------------------------------------------
// execute
//------------------------------------------------------------------------------
void TapeWriteTask::execute(castor::tape::tapeFile::WriteSession & session,
MigrationReportPacker & reportPacker,castor::log::LogContext& lc) {
using castor::log::LogContext;
......@@ -124,22 +127,30 @@ namespace daemon {
reportPacker.reportFailedJob(*m_fileToMigrate,e.getMessageValue(),e.code());
}
}
//------------------------------------------------------------------------------
// getFreeBlock
//------------------------------------------------------------------------------
MemBlock * TapeWriteTask::getFreeBlock() {
return m_fifo.getFreeBlock();
}
//------------------------------------------------------------------------------
// pushDataBlock
//------------------------------------------------------------------------------
void TapeWriteTask::pushDataBlock(MemBlock *mb) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.pushDataBlock(mb);
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
TapeWriteTask::~TapeWriteTask() {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
}
//------------------------------------------------------------------------------
// openWriteFile
//------------------------------------------------------------------------------
std::auto_ptr<tapeFile::WriteFile> TapeWriteTask::openWriteFile(
tape::tapeFile::WriteSession & session, log::LogContext& lc){
std::auto_ptr<tape::tapeFile::WriteFile> output;
......
......@@ -127,6 +127,10 @@ private:
*/
int m_blockCount;
/**
* A shared flag among the the tasks and the task injector, set as true as soon
* as task failed to do its job
*/
castor::tape::threading::AtomicFlag& m_errorFlag;
};
......
......@@ -24,7 +24,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
......
......@@ -53,12 +53,22 @@ namespace tape {
EndOfFile(const std::string & w): castor::exception::Exception(w) {}
virtual ~EndOfFile() throw() {}
};
/**
* Used
*/
class ErrorFlag : public castor::exception::Exception {
public:
ErrorFlag(): castor::exception::Exception("Internal exception, should not be seen") {}
virtual ~ErrorFlag() throw() {}
};
class MemException: public castor::tape::Exception {
public:
MemException(const std::string & what): Exception(what) {}
virtual ~MemException() throw() {}
};
}
} //namespace tape
} //namespace castor
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment