Commit 24528285 authored by Eric Cano's avatar Eric Cano
Browse files

Catching up with tapeserver branch

parents f6c21020 10b0f9f0
......@@ -58,16 +58,19 @@ namespace daemon {
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
try{
//we first check here to not even try to open the disk if a previous task has failed
//because the disk could the very reason why the previous one failed,
//so dont do the same mistake twice !
hasAnotherTaskTailed();
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
log::LogContext::ScopedParam sp(lc, log::Param("filePath",m_migratedFile->path()));
lc.log(LOG_INFO,"Opened file on disk for migration ");
while(migratingFileSize>0){
//if a task has signaled an error, we stop our job
if(m_errorFlag){
throw castor::tape::exceptions::ErrorFlag();
}
hasAnotherTaskTailed();
MemBlock* const mb = m_nextTask.getFreeBlock();
AutoPushBlock push(mb,m_nextTask);
......
......@@ -47,7 +47,12 @@ public:
virtual void execute(log::LogContext& lc);
private:
void hasAnotherTaskTailed() const {
//if a task has signaled an error, we stop our job
if(m_errorFlag){
throw castor::tape::exceptions::ErrorFlag();
}
}
void circulateAllBlocks(size_t fromBlockId);
/**
* The task (a TapeWriteTask) that will handle the read blocks
......
......@@ -103,7 +103,9 @@ namespace daemon {
void MigrationTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
if(!m_errorFlag) {
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
}
}
bool MigrationTaskInjector::synchronousInjection(uint64_t maxFiles,
......@@ -155,9 +157,17 @@ namespace daemon {
} //end of while(1)
}//end of try
catch(const castor::tape::exceptions::ErrorFlag&){
//we end up there because a task screw up somewhere
m_parent.m_lc.log(LOG_ERR,"In MigrationTaskInjector::WorkerThread::run(): a task screw up, "
"finishing and discarding all tasks ");
//first send the end signal to the threads
m_parent.m_tapeWriter.finish();
m_parent.m_diskReader.finish();
//discard all the tasks !!
while(m_parent.m_queue.size()>0){
Request req = m_parent.m_queue.pop();
m_parent.m_queue.pop();
}
} // end of while(1)
//-------------
......
......@@ -23,7 +23,7 @@ namespace tapeserver{
namespace daemon {
RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTask> & tapeReader,
TapeSingleThreadInterface<TapeReadTaskInterface> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,
client::ClientInterface& client,castor::log::LogContext lc) :
m_thread(*this),m_memManager(mm),
......@@ -68,8 +68,8 @@ void RecallTaskInjector::injectBulkRecalls(const std::vector<castor::tape::tapeg
new DiskWriteTask(
removeOwningList(dynamic_cast<tape::tapegateway::FileToRecallStruct*>((*it)->clone())),
m_memManager);
TapeReadFileTask * trt =
new TapeReadFileTask(
TapeReadTask * trt =
new TapeReadTask(
removeOwningList(
dynamic_cast<tape::tapegateway::FileToRecallStruct*>((*it)->clone())),
*dwt,
......
......@@ -29,7 +29,7 @@
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadFileTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
......@@ -49,7 +49,7 @@ class RecallTaskInjector: public TaskInjector {
public:
RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTask> & tapeReader,
TapeSingleThreadInterface<TapeReadTaskInterface> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,client::ClientInterface& client,
castor::log::LogContext lc);
......@@ -138,7 +138,7 @@ private:
RecallMemoryManager & m_memManager;
TapeSingleThreadInterface<TapeReadTask> & m_tapeReader;
TapeSingleThreadInterface<TapeReadTaskInterface> & m_tapeReader;
DiskThreadPoolInterface<DiskWriteTaskInterface> & m_diskWriter;
client::ClientInterface& m_client;
......
......@@ -32,14 +32,14 @@ public:
}
};
class FakeSingleTapeReadThread : public TapeSingleThreadInterface<TapeReadTask>
class FakeSingleTapeReadThread : public TapeSingleThreadInterface<TapeReadTaskInterface>
{
public:
using TapeSingleThreadInterface<TapeReadTask>::m_tasks;
using TapeSingleThreadInterface<TapeReadTaskInterface>::m_tasks;
FakeSingleTapeReadThread(castor::tape::drives::DriveInterface& drive,
const std::string & vid, castor::log::LogContext & lc):
TapeSingleThreadInterface<TapeReadTask>(drive, vid, lc){}
TapeSingleThreadInterface<TapeReadTaskInterface>(drive, vid, lc){}
~FakeSingleTapeReadThread(){
const unsigned int size= m_tasks.size();
......@@ -51,7 +51,7 @@ public:
{
m_tasks.push(NULL);
}
virtual void push(TapeReadTask* t){
virtual void push(TapeReadTaskInterface* t){
m_tasks.push(t);
}
};
......@@ -92,11 +92,11 @@ TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNominal) {
for(int i=0;i<1;++i)
{
DiskWriteTaskInterface* diskWriteTask=diskWrite.m_tasks.pop();
TapeReadTask* tapeReadTask=tapeRead.m_tasks.pop();
TapeReadTaskInterface* tapeReadTask=tapeRead.m_tasks.pop();
//static_cast is needed otherwise compilation fails on SL5 with a raw NULL
ASSERT_EQ(static_cast<DiskWriteTaskInterface*>(NULL),diskWriteTask);
ASSERT_EQ(static_cast<TapeReadTask*>(NULL),tapeReadTask);
ASSERT_EQ(static_cast<TapeReadTaskInterface*>(NULL),tapeReadTask);
delete diskWriteTask;
delete tapeReadTask;
}
......
/******************************************************************************
* TapeReadFileTask.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class TapeReadFileTask: public TapeReadTask {
public:
TapeReadFileTask(castor::tape::tapegateway::FileToRecallStruct * ftr,
DataConsumer & destination, RecallMemoryManager & mm):
m_fileToRecall(ftr), m_fifo(destination), m_mm(mm) {}
virtual void execute(castor::tape::tapeFile::ReadSession & rs,
castor::log::LogContext & lc) {
using castor::log::Param;
typedef castor::log::LogContext::ScopedParam ScopedParam;
// Placeholder for the tape file read
// Set the common context for all the omming logs (file info)
ScopedParam sp0(lc, Param("NSHOSTNAME", m_fileToRecall->nshost()));
ScopedParam sp1(lc, Param("NSFILEID", m_fileToRecall->fileid()));
ScopedParam sp2(lc, Param("BlockId", castor::tape::tapeFile::BlockId::extract(*m_fileToRecall)));
ScopedParam sp3(lc, Param("fSeq", m_fileToRecall->fseq()));
ScopedParam sp4(lc, Param("fileTransactionId", m_fileToRecall->fileTransactionId()));
// Read the file and transmit it
bool stillReading = true;
int fileBlock = 0;
int tapeBlock = 0;
MemBlock* mb=NULL;
try {
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
while (stillReading) {
// Get a memory block and add information to its metadata
mb=m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileBlock = fileBlock++;
mb->m_fileid = m_fileToRecall->fileid();
mb->m_tapeFileBlock = tapeBlock;
mb->m_tapeBlockSize = rf->getBlockSize();
try {
// Fill up the memory block with tape block
// append conveniently returns false when there will not be more space
// for an extra tape block, and throws an exception if we reached the
// end of file. append() also protects against reading too big tape blocks.
while (mb->m_payload.append(*rf)) {
tapeBlock++;
}
} catch (const castor::tape::exceptions::EndOfFile&) {
// append() signaled the end of the file.
stillReading = false;
}
// Pass the block to the disk write task
m_fifo.pushDataBlock(mb);
} //end of while(stillReading)
} //end of try
catch (castor::exception::Exception & ex) {
//we end up there because :
//-- openReadFile brought us here (cant put the tape into position)
//-- m_payload.append brought us here (error while reading the file)
// This is an error case. Log and signal to the disk write task
{
castor::log::LogContext::ScopedParam sp0(lc, Param("fileBlock", fileBlock));
castor::log::LogContext::ScopedParam sp1(lc, Param("ErrorMessage", ex.getMessageValue()));
castor::log::LogContext::ScopedParam sp2(lc, Param("ErrorCode", ex.code()));
lc.log(LOG_ERR, "Error reading a file block in TapeReadFileTask (backtrace follows)");
}
{
castor::log::LogContext lc2(lc.logger());
lc2.logBacktrace(LOG_ERR, ex.backtrace());
}
//if we end up there because openReadFile brought us here
//then mb is not valid, we need to get a block
if(!mb) {
mb=m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid();
}
//mark the block failed and push it
mb->markAsFailed();
m_fifo.pushDataBlock(mb);
m_fifo.pushDataBlock(NULL);
return;
}
// In all cases, we have to signal the end of the tape read to the disk write
// task.
m_fifo.pushDataBlock(NULL);
lc.log(LOG_DEBUG, "File read completed");
}
private:
// Open the file and manage failure (if any)
std::auto_ptr<castor::tape::tapeFile::ReadFile> openReadFile(
castor::tape::tapeFile::ReadSession & rs, castor::log::LogContext & lc){
using castor::log::Param;
typedef castor::log::LogContext::ScopedParam ScopedParam;
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf;
try {
rf.reset(new castor::tape::tapeFile::ReadFile(&rs, *m_fileToRecall));
lc.log(LOG_DEBUG, "Successfully opened the tape file");
} catch (castor::exception::Exception & ex) {
// Log the error
ScopedParam sp0(lc, Param("ErrorMessage", ex.getMessageValue()));
ScopedParam sp1(lc, Param("ErrorCode", ex.code()));
lc.log(LOG_ERR, "Failed to open tape file for reading");
throw;
}
return rf;
}
std::auto_ptr<castor::tape::tapegateway::FileToRecallStruct> m_fileToRecall;
DataConsumer & m_fifo;
RecallMemoryManager & m_mm;
};
}
}
}
}
......@@ -26,7 +26,7 @@
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTaskInterface.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
......@@ -37,18 +37,18 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class TapeReadSingleThread : public TapeSingleThreadInterface<TapeReadTask>{
class TapeReadSingleThread : public TapeSingleThreadInterface<TapeReadTaskInterface>{
public:
TapeReadSingleThread(castor::tape::drives::DriveInterface & drive,
const std::string vid, uint64_t maxFilesRequest,
castor::log::LogContext & lc):
TapeSingleThreadInterface<TapeReadTask>(drive, vid, lc),
TapeSingleThreadInterface<TapeReadTaskInterface>(drive, vid, lc),
m_maxFilesRequest(maxFilesRequest) {}
void setTaskInjector(TaskInjector * ti) { m_taskInjector = ti; }
private:
TapeReadTask * popAndRequestMoreJobs() {
castor::tape::threading::BlockingQueue<TapeReadTask *>::valueRemainingPair
TapeReadTaskInterface * popAndRequestMoreJobs() {
castor::tape::threading::BlockingQueue<TapeReadTaskInterface *>::valueRemainingPair
vrp = m_tasks.popGetSize();
// If we just passed (down) the half full limit, ask for more
// (the remaining value is after pop)
......@@ -81,7 +81,7 @@ private:
// the task injector
while(1) {
// NULL indicated the end of work
TapeReadTask * task = popAndRequestMoreJobs();
TapeReadTaskInterface * task = popAndRequestMoreJobs();
m_logContext.log(LOG_DEBUG, "TapeReadThread: just got one more job");
if (task) {
task->execute(*rs, m_logContext);
......
......@@ -24,23 +24,134 @@
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTaskInterface.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class TapeReadTask {
class TapeReadTask: public TapeReadTaskInterface {
public:
virtual void execute(castor::tape::tapeFile::ReadSession & /*rs*/,
castor::log::LogContext & /*lc*/) {
throw MemException("Tring to execute a non-execuatble TapeReadTask");
};
virtual ~TapeReadTask() {}
TapeReadTask(castor::tape::tapegateway::FileToRecallStruct * ftr,
DataConsumer & destination, RecallMemoryManager & mm):
m_fileToRecall(ftr), m_fifo(destination), m_mm(mm) {}
virtual void execute(castor::tape::tapeFile::ReadSession & rs,
castor::log::LogContext & lc) {
using castor::log::Param;
typedef castor::log::LogContext::ScopedParam ScopedParam;
// Placeholder for the tape file read
// Set the common context for all the omming logs (file info)
ScopedParam sp0(lc, Param("NSHOSTNAME", m_fileToRecall->nshost()));
ScopedParam sp1(lc, Param("NSFILEID", m_fileToRecall->fileid()));
ScopedParam sp2(lc, Param("BlockId", castor::tape::tapeFile::BlockId::extract(*m_fileToRecall)));
ScopedParam sp3(lc, Param("fSeq", m_fileToRecall->fseq()));
ScopedParam sp4(lc, Param("fileTransactionId", m_fileToRecall->fileTransactionId()));
// Read the file and transmit it
bool stillReading = true;
int fileBlock = 0;
int tapeBlock = 0;
MemBlock* mb=NULL;
try {
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
while (stillReading) {
// Get a memory block and add information to its metadata
mb=m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileBlock = fileBlock++;
mb->m_fileid = m_fileToRecall->fileid();
mb->m_tapeFileBlock = tapeBlock;
mb->m_tapeBlockSize = rf->getBlockSize();
try {
// Fill up the memory block with tape block
// append conveniently returns false when there will not be more space
// for an extra tape block, and throws an exception if we reached the
// end of file. append() also protects against reading too big tape blocks.
while (mb->m_payload.append(*rf)) {
tapeBlock++;
}
} catch (const castor::tape::exceptions::EndOfFile&) {
// append() signaled the end of the file.
stillReading = false;
}
// Pass the block to the disk write task
m_fifo.pushDataBlock(mb);
} //end of while(stillReading)
} //end of try
catch (castor::exception::Exception & ex) {
//we end up there because :
//-- openReadFile brought us here (cant put the tape into position)
//-- m_payload.append brought us here (error while reading the file)
// This is an error case. Log and signal to the disk write task
{
castor::log::LogContext::ScopedParam sp0(lc, Param("fileBlock", fileBlock));
castor::log::LogContext::ScopedParam sp1(lc, Param("ErrorMessage", ex.getMessageValue()));
castor::log::LogContext::ScopedParam sp2(lc, Param("ErrorCode", ex.code()));
lc.log(LOG_ERR, "Error reading a file block in TapeReadFileTask (backtrace follows)");
}
{
castor::log::LogContext lc2(lc.logger());
lc2.logBacktrace(LOG_ERR, ex.backtrace());
}
//if we end up there because openReadFile brought us here
//then mb is not valid, we need to get a block
if(!mb) {
mb=m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid();
}
//mark the block failed and push it
mb->markAsFailed();
m_fifo.pushDataBlock(mb);
m_fifo.pushDataBlock(NULL);
return;
}
// In all cases, we have to signal the end of the tape read to the disk write
// task.
m_fifo.pushDataBlock(NULL);
lc.log(LOG_DEBUG, "File read completed");
}
private:
// Open the file and manage failure (if any)
std::auto_ptr<castor::tape::tapeFile::ReadFile> openReadFile(
castor::tape::tapeFile::ReadSession & rs, castor::log::LogContext & lc){
using castor::log::Param;
typedef castor::log::LogContext::ScopedParam ScopedParam;
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf;
try {
rf.reset(new castor::tape::tapeFile::ReadFile(&rs, *m_fileToRecall));
lc.log(LOG_DEBUG, "Successfully opened the tape file");
} catch (castor::exception::Exception & ex) {
// Log the error
ScopedParam sp0(lc, Param("ErrorMessage", ex.getMessageValue()));
ScopedParam sp1(lc, Param("ErrorCode", ex.code()));
lc.log(LOG_ERR, "Failed to open tape file for reading");
throw;
}
return rf;
}
std::auto_ptr<castor::tape::tapegateway::FileToRecallStruct> m_fileToRecall;
DataConsumer & m_fifo;
RecallMemoryManager & m_mm;
};
}
}
}
}
/******************************************************************************
* TapeReadTaskInterface.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class TapeReadTaskInterface {
public:
virtual void execute(castor::tape::tapeFile::ReadSession & /*rs*/,
castor::log::LogContext & /*lc*/) {
throw MemException("Tring to execute a non-execuatble TapeReadTask");
};
virtual ~TapeReadTaskInterface() {}
};
}
}
}
}
......@@ -63,16 +63,23 @@ namespace daemon {
unsigned long ckSum = Payload::zeroAdler32();
int blockId = 0;
try {
//we first check here to not even try to move the tape if a previous task has failed
//because the tape- could the very reason why the previous one failed,
//so dont do the same mistake twice !
hasAnotherTaskTailed();
//try to open the session
std::auto_ptr<castor::tape::tapeFile::WriteFile> output(openWriteFile(session,lc));
while(!m_fifo.finished()) {
if(m_errorFlag){
throw castor::tape::exceptions::ErrorFlag();
}
//if someone screw somewhere else, we stop
hasAnotherTaskTailed();
MemBlock* const mb = m_fifo.popDataBlock();
AutoReleaseBlock<MemoryManager> releaser(mb,m_memManager);
if(/*m_migratingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
* || */blockId != mb->m_fileBlock || mb->m_failed ){
if(m_fileToMigrate->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->m_failed ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
......@@ -84,11 +91,13 @@ namespace daemon {
throw castor::tape::Exception("received a bad block for writing");
}
ckSum = mb->m_payload.adler32(ckSum);
mb->m_payload.write(*output);
++blockId;
}
//finish the writing of the file on tape
//put the trailer
output->close();
reportPacker.reportCompletedJob(*m_fileToMigrate,ckSum);