Commit 3d157190 authored by David COME's avatar David COME
Browse files

Computing compressed files' size for migration is in place

parent 4d960a87
......@@ -26,6 +26,7 @@
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include "castor/tape/tapegateway/FileErrorReportStruct.hpp"
#include "castor/tape/tapegateway/FileMigratedNotificationStruct.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include <numeric>
#include <cstdio>
......@@ -75,9 +76,9 @@ void MigrationReportPacker::reportFailedJob(const tapegateway::FileToMigrateStru
//------------------------------------------------------------------------------
//reportFlush
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFlush(uint64_t nbByteWritenWithCompression){
void MigrationReportPacker::reportFlush(drives::compressionStats compressStats){
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportFlush(nbByteWritenWithCompression));
m_fifo.push(new ReportFlush(compressStats));
}
//------------------------------------------------------------------------------
//reportEndOfSession
......@@ -115,8 +116,7 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _th
successMigration->setChecksum(m_checksum);
successMigration->setFileSize(m_migratedFile.fileSize());
// successMigration->setCompressedFileSize();
// successMigration->setBlockId0(m_migratedFile.BlockId0());
// successMigration->setBlockId1();
// successMigration->setBlockId2();
......@@ -139,9 +139,10 @@ std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator end
rawSize+=(*it)->fileSize();
}
uint64_t nbByteWritenWithCompression = m_compressStats.toTape;
//we dont want compressionRatio to be equal to zero not to have a division by zero
double compressionRatio = nbByteWritenWithCompression>0 && rawSize >0 ?
1.0*rawSize/nbByteWritenWithCompression : 1.;
1.0*nbByteWritenWithCompression/rawSize : 1.;
for(std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator it = beg;
it != end ;++it){
......@@ -160,14 +161,19 @@ std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator end
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& _this){
computeCompressedSize(_this.m_listReports->successfulMigrations().begin(),
_this.m_listReports->successfulMigrations().end());
if(!_this.m_errorHappened){
_this.logReport(_this.m_listReports->successfulMigrations(),"A file was successfully written on the tape");
tapeserver::client::ClientInterface::RequestReport chrono;
try{
_this.m_client.reportMigrationResults(*(_this.m_listReports),chrono);
computeCompressedSize(_this.m_listReports->successfulMigrations().begin(),
_this.m_listReports->successfulMigrations().end());
_this.m_client.reportMigrationResults(*(_this.m_listReports),chrono);
_this.logReport(_this.m_listReports->successfulMigrations(),"A file was successfully written on the tape");
log::ScopedParamContainer container(_this.m_lc);
container.add("batch size",_this.m_listReports->successfulMigrations().size())
.add("compressed",m_compressStats.toTape)
.add("Non compressed",m_compressStats.fromHost);
_this.m_lc.log(LOG_INFO,"Reported to the client that a batch of file was written on tape");
}
catch(const castor::exception::Exception& e){
LogContext::ScopedParam sp[]={
......
......@@ -27,7 +27,7 @@
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include <list>
#include <memory>
......@@ -65,9 +65,11 @@ public:
void reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,const std::string& msg,int error_code);
/**
* Create into the MigrationReportPacker a report for the signaling a flusing on tape
*/
void reportFlush(uint64_t nbByteWritenWithCompression);
* Create into the MigrationReportPacker a report for the signaling a flusing on tape
* @param compressStats
*
*/
void reportFlush(drives::compressionStats compressStats);
/**
* Create into the MigrationReportPacker a report for the nominal end of session
......@@ -106,7 +108,7 @@ private:
virtual void execute(MigrationReportPacker& _this);
};
class ReportFlush : public Report {
uint64_t nbByteWritenWithCompression;
drives::compressionStats m_compressStats;
/**
* This function will approximate the compressed size of the files which
......@@ -128,7 +130,7 @@ private:
* @param nbByte the number of byte it really wrote to tape between
* this flush and the previous one
* */
ReportFlush(uint64_t nbByte):nbByteWritenWithCompression(nbByte){}
ReportFlush(drives::compressionStats compressStats):m_compressStats(compressStats){}
void execute(MigrationReportPacker& _this);
};
......
......@@ -21,6 +21,7 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "castor/tape/tapeserver/client/FakeClient.hpp"
#include "castor/log/StringLogger.hpp"
#include <gtest/gtest.h>
......@@ -28,6 +29,7 @@
namespace unitTests {
const std::string error="ERROR_TEST";
using namespace castor::tape;
const tapeserver::drives::compressionStats statsCompress;
using ::testing::_;
TEST(castor_tape_tapeserver_daemon, MigrationReportPackerNominal) {
......@@ -45,7 +47,7 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerNominal) {
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFlush(0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.waitThread();
......@@ -71,7 +73,7 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillure) {
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFailedJob(failed,error,-1);
mrp.reportFlush(0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSessionWithErrors(error,-1);
mrp.waitThread();
......@@ -96,7 +98,7 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillureGoodEnd) {
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFailedJob(failed,error,-1);
mrp.reportFlush(0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.waitThread();
......@@ -123,7 +125,7 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerGoodBadEnd) {
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportFlush(0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSessionWithErrors(error,-1);
mrp.waitThread();
......
......@@ -95,9 +95,11 @@ tapeFlush(const std::string& message,uint64_t bytes,uint64_t files,
.add("bytes", bytes)
.add("flushTime", flushTime);
m_logContext.log(LOG_INFO,message);
uint64_t nbByteWritenWithCompression = m_drive.nbByteWritenWithCompression();
m_reportPacker.reportFlush(nbByteWritenWithCompression);
m_stats.flushTime += flushTime;
m_reportPacker.reportFlush(m_drive.getCompression());
m_drive.clearCompressionStats();
}
//------------------------------------------------------------------------------
// isTapeWritable
......
......@@ -637,10 +637,6 @@ void drives::DriveGeneric::SCSI_inquiry() {
<< SCSI::Structures::toString(*((SCSI::Structures::inquiryData_t *) dataBuff));
}
uint64_t drives::DriveGeneric::nbByteWritenWithCompression(){
return 0;
}
drives::compressionStats drives::DriveT10000::getCompression() {
compressionStats driveCompressionStats;
......
......@@ -301,9 +301,6 @@ namespace drives {
void SCSI_inquiry();
/**
*/
virtual uint64_t nbByteWritenWithCompression();
protected:
SCSI::DeviceInfo m_SCSIInfo;
int m_tapeFD;
......
......@@ -136,8 +136,10 @@ namespace drives {
class DriveInterface {
public:
virtual ~DriveInterface(){};
virtual compressionStats getCompression() = 0;
virtual void clearCompressionStats() = 0;
virtual deviceInfo getDeviceInfo() = 0;
virtual std::string getSerialNumber() = 0;
virtual void positionToLogicalObject(uint32_t blockId) = 0;
......@@ -148,27 +150,33 @@ namespace drives {
virtual driveStatus getDriveStatus() = 0;
virtual tapeError getTapeError() = 0;
virtual void setSTBufferWrite(bool bufWrite) = 0;
virtual void fastSpaceToEOM(void) = 0;
virtual void rewind(void) = 0;
virtual void spaceToEOM(void) = 0;
virtual void spaceFileMarksBackwards(size_t count) = 0;
virtual void spaceFileMarksForward(size_t count) = 0;
virtual void unloadTape(void) = 0;
virtual void flush(void) = 0;
virtual void writeSyncFileMarks(size_t count) = 0;
virtual void writeImmediateFileMarks(size_t count) = 0;
virtual void writeBlock(const void * data, size_t count) = 0;
virtual ssize_t readBlock(void * data, size_t count) = 0;
virtual void readExactBlock(void * data, size_t count, std::string context) = 0;
virtual void readFileMark(std::string context) = 0;
virtual bool waitUntilReady(int timeoutSecond) = 0;
virtual bool isWriteProtected() = 0;
virtual bool isAtBOT() = 0;
virtual bool isAtEOD() = 0;
virtual bool isTapeBlank() = 0;
virtual bool hasTapeInPlace() = 0;
virtual uint64_t nbByteWritenWithCompression() = 0;
/**
* Member string allowing the convenient storage of the string describing
* drive location for the mount system (we get the information from TPCONFIG
......
......@@ -191,7 +191,3 @@ bool castor::tape::tapeserver::drives::FakeDrive::isTapeBlank() {
bool castor::tape::tapeserver::drives::FakeDrive::hasTapeInPlace() {
return true;
}
uint64_t castor::tape::tapeserver::drives::FakeDrive::nbByteWritenWithCompression(){
return 0;
}
......@@ -71,7 +71,6 @@ namespace drives {
virtual bool isAtEOD() ;
virtual bool isTapeBlank();
virtual bool hasTapeInPlace();
virtual uint64_t nbByteWritenWithCompression();
};
}}}}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment