Commit e36dc62e authored by Eric Cano's avatar Eric Cano
Browse files

Catching up with parallel development.

parents 10fc09df e45e5e2e
......@@ -43,10 +43,12 @@ add_executable(tapeserver-mm
DiskWriteTask.cpp
MigrationTaskInjector.cpp)
find_package( ZLIB REQUIRED )
target_link_libraries(tapeserver-mm
TapeDrive Exception SCSI System Utils File
castorcommon castorclient castorTapeServerThreading castortapeutils
castortapegatewayprotocol tapeserver)
castortapegatewayprotocol tapeserver ${ZLIB_LIBRARIES} )
add_library(tapeserver
../client/ClientProxy.cpp
......@@ -59,7 +61,7 @@ add_library(tapeserver
TapeWriteTask.cpp
DiskReadTask.cpp
DiskWriteTask.cpp
MigrationTaskInjector.cpp)
MigrationTaskInjector.cpp)
add_library(tapeserverdTest
......
......@@ -57,6 +57,8 @@ namespace daemon {
size_t migratingFileSize=m_migratedFile->fileSize();
try{
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
log::LogContext::ScopedParam sp(lc, log::Param("filePath",m_migratedFile->path()));
lc.log(LOG_INFO,"Opened file on disk for migration ");
while(migratingFileSize>0){
MemBlock* const mb = m_nextTask.getFreeBlock();
AutoPushBlock push(mb,m_nextTask);
......
......@@ -41,9 +41,19 @@ public:
virtual void execute(log::LogContext& lc);
private:
//TW ; tape write
/**
* The task (a TapeWriteTask) that will handle the read blocks
*/
DataConsumer & m_nextTask;
/**
* All we need to know about the file we are migrating
*/
std::auto_ptr<tape::tapegateway::FileToMigrateStruct> m_migratedFile;
/**
* The number of memory block we will need to read the whole file
*/
size_t m_numberOfBlock;
};
......
......@@ -81,19 +81,22 @@ namespace daemon {
return ret;
}
void DiskReadThreadPool::DiskReadWorkerThread::run() {
_this.m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
_this.m_lc.log(LOG_DEBUG, "Starting DiskReadWorkerThread");
m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
m_lc.log(LOG_DEBUG, "Starting DiskReadWorkerThread");
std::auto_ptr<DiskReadTaskInterface> task;
while(1) {
task.reset( _this.popAndRequestMore());
task.reset( m_parent.popAndRequestMore());
if (NULL!=task.get()) {
task->execute(lc);
task->execute(m_lc);
}
else {
break;
}
} //end of while(1)
_this.m_lc.log(LOG_DEBUG, "Finishing of DiskReadWorkerThread");
// We now acknowledge to the task injector that read reached the end. There
// will hence be no more requests for more.
m_parent.m_injector->finish();
m_lc.log(LOG_DEBUG, "Finishing of DiskReadWorkerThread");
}
tape::threading::AtomicCounter<int> DiskReadThreadPool::DiskReadWorkerThread::m_nbActiveThread(0);
......
......@@ -50,12 +50,12 @@ public:
void setTaskInjector(MigrationTaskInjector* injector){
m_injector = injector;
}
DiskReadTaskInterface* popAndRequestMore();
private:
DiskReadTaskInterface* popAndRequestMore();
class DiskReadWorkerThread: private castor::tape::threading::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & manager):
threadID(m_nbActiveThread++),_this(manager),lc(_this.m_lc) {
threadID(m_nbActiveThread++),m_parent(manager),lc(m_parent.m_lc) {
log::LogContext::ScopedParam param(lc, log::Param("threadID", threadID));
lc.log(LOG_INFO,"DiskWrite Thread created");
}
......@@ -64,7 +64,7 @@ private:
private:
static tape::threading::AtomicCounter<int> m_nbActiveThread;
const int threadID;
DiskReadThreadPool & _this;
DiskReadThreadPool & m_parent;
castor::log::LogContext lc;
virtual void run();
......
......@@ -25,6 +25,11 @@
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
namespace {
//todo : merge it with one of TapeWriteTask and put is somewhere
unsigned long initAdler32Checksum() {
return adler32(0L,Z_NULL,0);
}
/*Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
*/
......@@ -65,7 +70,7 @@ namespace daemon {
try{
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
int blockId = 0;
unsigned long checksum = initAdler32Checksum();
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
AutoReleaseBlock releaser(mb,m_memManager);
......@@ -84,14 +89,15 @@ namespace daemon {
throw castor::tape::Exception("received a bad block for writing");
}
mb->m_payload.write(ourFile);
checksum = mb->m_payload.adler32(checksum);
blockId++;
}
else
break;
} //end of while(1)
reporter.reportCompletedJob(*m_recallingFile);
reporter.reportCompletedJob(*m_recallingFile,checksum);
return true;
}
} //end of try
catch(const castor::exception::Exception& e){
/*
*We might end up there with some blocks into m_fifo
......
......@@ -15,7 +15,7 @@ namespace unitTests{
using namespace castor::tape::tapeserver::daemon;
using namespace castor::tape::tapeserver::client;
struct MockRecallReportPacker : public ReportPackerInterface<detail::Recall>{
MOCK_METHOD1(reportCompletedJob,void(const FileStruct&));
MOCK_METHOD2(reportCompletedJob,void(const FileStruct&,unsigned long));
MOCK_METHOD3(reportFailedJob, void(const FileStruct& ,const std::string&,int));
MOCK_METHOD0(reportEndOfSession, void());
MOCK_METHOD2(reportEndOfSessionWithErrors, void(const std::string,int));
......
......@@ -121,8 +121,6 @@ namespace daemon {
//Im the last Thread alive, report end of session
if(m_parentThreadPool.m_failedWriteCount==0){
m_parentThreadPool.m_reporter.reportEndOfSession();
//TODO
// _this.m_jobInjector->end();
}
else{
m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
......
......@@ -58,7 +58,8 @@ private:
bool crossingDownFileThreshod(int filesPopped) const;
/**
* Pop a task frem m_tasks , if
* Pop a task from m_tasks.
* TODO The loopBack part (AndRequestMoreJob) should move to TapeReadSingleSthread
* @return
*/
DiskWriteTaskInterface * popAndRequestMoreJobs() ;
......
......@@ -15,7 +15,7 @@ namespace unitTests{
using namespace castor::tape::tapeserver::daemon;
using namespace castor::tape::tapeserver::client;
struct MockRecallReportPacker : public ReportPackerInterface<detail::Recall>{
MOCK_METHOD1(reportCompletedJob,void(const FileStruct&));
MOCK_METHOD2(reportCompletedJob,void(const FileStruct&,unsigned long));
MOCK_METHOD3(reportFailedJob, void(const FileStruct& ,const std::string&,int));
MOCK_METHOD0(reportEndOfSession, void());
MOCK_METHOD2(reportEndOfSessionWithErrors, void(const std::string,int));
......@@ -34,7 +34,7 @@ namespace unitTests{
castor::log::LogContext lc(log);
MockRecallReportPacker report(client,lc);
EXPECT_CALL(report,reportCompletedJob(_)).Times(5);
EXPECT_CALL(report,reportCompletedJob(_,_)).Times(5);
//EXPECT_CALL(tskInjectorl,requestInjection(_,_,_)).Times(2);
EXPECT_CALL(report,reportEndOfSession()).Times(1);
......
......@@ -27,127 +27,13 @@
#include "castor/tape/tapeserver/daemon/Exception.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include <memory>
#include "castor/tape/tapeserver/daemon/Payload.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Class managing a fixed size payload buffer. Some member functions also
* allow read
* @param capacity Size of the payload buffer in bytes
*/
class Payload
{
public:
Payload(size_t capacity):
m_payload(new (std::nothrow) char[capacity]),m_totalCapacity(capacity),m_size(0) {
if(NULL == m_payload) {
throw MemException("Failed to allocate memory for a new MemBlock!");
}
}
~Payload(){
delete[] m_payload;
}
/** Amount of data present in the payload buffer */
size_t size() const {
return m_size;
}
/** Remaining free space in the payload buffer */
size_t remainingFreeSpace() const {
return m_totalCapacity - m_size;
}
/** Total size of the payload block */
size_t totalCapacity() const {
return m_totalCapacity;
}
/** Returns a pointer to the beginning of the payload block */
char* get(){
return m_payload;
}
/** Returns a pointer to the beginning of the payload block (readonly version) */
char const* get() const {
return m_payload;
}
/**
* Reads all the buffer in one go from a diskFile::ReadFile object
* @param from reference to the diskFile::ReadFile
*/
size_t read(tape::diskFile::ReadFile& from){
m_size = from.read(m_payload,m_totalCapacity);
return m_size;
}
class EndOfFile: public castor::exception::Exception {
public:
EndOfFile(const std::string & w): castor::exception::Exception(w) {}
virtual ~EndOfFile() throw() {}
};
/**
* Reads one block from a tapeFile::readFile
* @throws castor::tape::daemon::Payload::EOF
* @param from reference to the tapeFile::ReadFile
* @return whether another tape block will fit in the memory block.
*/
bool append(tape::tapeFile::ReadFile & from){
if (from.getBlockSize() > remainingFreeSpace()) {
std::stringstream err;
err << "Trying to read a tape file block with too little space left: BlockSize="
<< from.getBlockSize() << " remainingFreeSpace=" << remainingFreeSpace()
<< " (totalSize=" << m_totalCapacity << ")";
throw MemException(err.str());
}
size_t readSize;
try {
readSize = from.read(m_payload + m_size, from.getBlockSize());
} catch (castor::tape::tapeFile::EndOfFile) {
throw EndOfFile("In castor::tape::tapeserver::daemon::Payload::append: reached end of file");
}
m_size += readSize;
return from.getBlockSize() <= remainingFreeSpace();
}
/**
* Write the complete buffer to a diskFile::WriteFile
* @param to reference to the diskFile::WriteFile
*/
void write(tape::diskFile::WriteFile& to){
to.write(m_payload,m_size);
}
/**
* Write the complete buffer to a tapeFile::WriteFile, tape block by
* tape block
* @param to reference to the tapeFile::WriteFile
*/
void write(tape::tapeFile::WriteFile& to) {
size_t blockSize = to.getBlockSize();
size_t writePosition = 0;
// Write all possible full tape blocks
while (m_size - writePosition > blockSize) {
to.write(m_payload + writePosition, blockSize);
writePosition += blockSize;
}
// Write a remainder, if any
if (m_size - writePosition) {
to.write(m_payload + writePosition, m_size - writePosition);
}
}
private:
char* m_payload;
size_t m_totalCapacity;
size_t m_size;
};
/**
* Individual memory block with metadata
*/
......
......@@ -51,8 +51,9 @@ MigrationReportPacker::~MigrationReportPacker(){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
}
void MigrationReportPacker::reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile));
void MigrationReportPacker::reportCompletedJob(
const tapegateway::FileToMigrateStruct& migratedFile,unsigned long checksum) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile,checksum));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
......@@ -85,6 +86,10 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _th
successMigration->setId(m_migratedFile.id());
successMigration->setNshost(m_migratedFile.nshost());
successMigration->setFileid(m_migratedFile.fileid());
successMigration->setChecksum(m_checksum);
//WARNING; Ad-hoc name of the ChecksumName !!
successMigration->setChecksumName("adler32");
_this.m_listReports->addSuccessfulMigrations(successMigration.release());
}
......
......@@ -52,7 +52,8 @@ public:
* of migratedFile
* @param migratedFile the file successfully migrated
*/
void reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile);
void reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile,
unsigned long checksum);
/**
* Create into the MigrationReportPacker a report for the failled migration
......@@ -89,9 +90,10 @@ private:
};
class ReportSuccessful : public Report {
const FileStruct m_migratedFile;
const unsigned long m_checksum;
public:
ReportSuccessful(const FileStruct& file):
m_migratedFile(file){}
ReportSuccessful(const FileStruct& file,unsigned long checksum):
m_migratedFile(file),m_checksum(checksum){}
virtual void execute(MigrationReportPacker& _this);
};
class ReportFlush : public Report {
......
......@@ -44,8 +44,8 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerNominal) {
tapegateway::FileToMigrateStruct migratedFile;
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFlush();
mrp.reportEndOfSession();
mrp.waitThread();
......@@ -68,9 +68,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillure) {
tapegateway::FileToMigrateStruct migratedFile;
tapegateway::FileToMigrateStruct failed;
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFailedJob(failed,error,-1);
mrp.reportFlush();
mrp.reportEndOfSessionWithErrors(error,-1);
......@@ -93,9 +93,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillureGoodEnd) {
tapegateway::FileToMigrateStruct migratedFile;
tapegateway::FileToMigrateStruct failed;
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFailedJob(failed,error,-1);
mrp.reportFlush();
mrp.reportEndOfSession();
......@@ -120,9 +120,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerGoodBadEnd) {
tapegateway::FileToMigrateStruct migratedFile;
tapegateway::FileToMigrateStruct failed;
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFlush();
mrp.reportEndOfSessionWithErrors(error,-1);
......
......@@ -72,13 +72,13 @@ namespace daemon {
tape::utils::suppresUnusedVariable(sp);
m_lc.log(LOG_INFO, "Logged file to migrate");
const u_signed64 neededBlock = fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1);
const u_signed64 neededBlock = howManyBlocksNeeded(fileSize,blockCapacity);
TapeWriteTask *twt = new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager);
DiskReadTask *drt = new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock);
std::auto_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager));
std::auto_ptr<DiskReadTask> drt(new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock));
m_tapeWriter.push(twt);
m_diskReader.push(drt);
m_tapeWriter.push(twt.release());
m_diskReader.push(drt.release());
}
}
......@@ -97,7 +97,6 @@ namespace daemon {
}
void MigrationTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
//@TODO where shall we acquire the lock ? There of just before the push ?
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
}
......
......@@ -69,6 +69,12 @@ public:
void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
private:
/*Compute how many blocks are needed for a file of fileSize bytes*/
size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
return fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1);
}
/**
* A request of files to migrate. We request EITHER
* - a maximum of nbMaxFiles files
......
/******************************************************************************
* Payload.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include <zlib.h>
#pragma once
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Class managing a fixed size payload buffer. Some member functions also
* allow read
* @param capacity Size of the payload buffer in bytes
*/
class Payload
{
public:
Payload(size_t capacity):
m_payload(new (std::nothrow) unsigned char[capacity]),m_totalCapacity(capacity),m_size(0) {
if(NULL == m_payload) {
throw MemException("Failed to allocate memory for a new MemBlock!");
}
}
~Payload(){
delete[] m_payload;
}
/** Amount of data present in the payload buffer */
size_t size() const {
return m_size;
}
/** Remaining free space in the payload buffer */
size_t remainingFreeSpace() const {
return m_totalCapacity - m_size;
}
/** Total size of the payload block */
size_t totalCapacity() const {
return m_totalCapacity;
}
/** Returns a pointer to the beginning of the payload block */
unsigned char* get(){
return m_payload;
}
/** Returns a pointer to the beginning of the payload block (readonly version) */
unsigned char const* get() const {
return m_payload;
}
/**
* Reads all the buffer in one go from a diskFile::ReadFile object
* @param from reference to the diskFile::ReadFile
*/
size_t read(tape::diskFile::ReadFile& from){
m_size = from.read(m_payload,m_totalCapacity);
return m_size;
}
class EndOfFile: public castor::exception::Exception {
public:
EndOfFile(const std::string & w): castor::exception::Exception(w) {}
virtual ~EndOfFile() throw() {}
};
/**
* Reads one block from a tapeFile::readFile
* @throws castor::tape::daemon::Payload::EOF
* @param from reference to the tapeFile::ReadFile
* @return whether another tape block will fit in the memory block.
*/
bool append(tape::tapeFile::ReadFile & from){
if (from.getBlockSize() > remainingFreeSpace()) {
std::stringstream err;
err << "Trying to read a tape file block with too little space left: BlockSize="
<< from.getBlockSize() << " remainingFreeSpace=" << remainingFreeSpace()
<< " (totalSize=" << m_totalCapacity << ")";
throw MemException(err.str());
}
size_t readSize;
try {
readSize = from.read(m_payload + m_size, from.getBlockSize());
} catch (castor::tape::tapeFile::EndOfFile) {
throw EndOfFile("In castor::tape::tapeserver::daemon::Payload::append: reached end of file");
}
m_size += readSize;
return from.getBlockSize() <= remainingFreeSpace();
}
/**
* Write the complete buffer to a diskFile::WriteFile
* @param to reference to the diskFile::WriteFile
*/
void write(tape::diskFile::WriteFile& to){
to.write(m_payload,m_size);
}
/**
* Write the complete buffer to a tapeFile::WriteFile, tape block by
* tape block
* @param to reference to the tapeFile::WriteFile
*/
void write(tape::tapeFile::WriteFile& to) {
size_t blockSize = to.getBlockSize();