Commit 1479fd7c authored by Daniele Kruse's avatar Daniele Kruse
Browse files

More cleanup

parent 60c8b7ec
......@@ -50,7 +50,7 @@ namespace daemon {
MigrationReportPacker::MigrationReportPacker(cta::ArchiveMount *archiveMount,
castor::log::LogContext lc):
ReportPackerInterface<detail::Migration>(lc),
m_workerThread(*this),m_errorHappened(false),m_continue(true), m_archiveMount(archiveMount) {
m_workerThread(*this),m_errorHappened(false),m_continue(true) {
}
//------------------------------------------------------------------------------
//Destructore
......@@ -112,28 +112,6 @@ void MigrationReportPacker::synchronousReportEndWithErrors(const std::string msg
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& reportPacker){
// std::unique_ptr<tapegateway::FileMigratedNotificationStruct> successMigration(new tapegateway::FileMigratedNotificationStruct);
// successMigration->setFseq(m_migratedFile.fseq());
// successMigration->setFileTransactionId(m_migratedFile.fileTransactionId());
// successMigration->setId(m_migratedFile.id());
// successMigration->setNshost(m_migratedFile.nshost());
// successMigration->setFileid(m_migratedFile.fileid());
// successMigration->setBlockId0((m_blockId >> 24) & 0xFF);
// successMigration->setBlockId1((m_blockId >> 16) & 0xFF);
// successMigration->setBlockId2((m_blockId >> 8) & 0xFF);
// successMigration->setBlockId3((m_blockId >> 0) & 0xFF);
// //WARNING; Ad-hoc name of the ChecksumName !!");
// successMigration->setChecksumName("adler32");
// successMigration->setChecksum(m_checksum);
//
// successMigration->setFileSize(m_migratedFile.fileSize());
//
//// successMigration->setBlockId0(m_migratedFile.BlockId0());
//// successMigration->setBlockId1();
//// successMigration->setBlockId2();
//// successMigration->setBlockId3();
//
// reportPacker.m_listReports->addSuccessfulMigrations(successMigration.release());
reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob));
}
//------------------------------------------------------------------------------
......@@ -168,41 +146,13 @@ std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator end
(*it)->setCompressedFileSize(validCompressedFileSize);
}
}
//------------------------------------------------------------------------------
//Report::reportFileErrors
//------------------------------------------------------------------------------
//void MigrationReportPacker::Report::reportFileErrors(MigrationReportPacker& reportPacker)
//{
// // Some errors still have to be transmitted to the client, but not the
// // successful writes which were not validated by a flush (they will be
// // discarded)
// if(reportPacker.m_listReports->failedMigrations().size()) {
// tapeserver::client::ClientInterface::RequestReport chrono;
// // First, cleanup the report of existing successes
// for (size_t i=0; i<reportPacker.m_listReports->successfulMigrations().size(); i++) {
// delete reportPacker.m_listReports->successfulMigrations()[i];
// }
// reportPacker.m_listReports->successfulMigrations().resize(0);
// // Report those errors to the client
// reportPacker.logReportWithError(reportPacker.m_listReports->failedMigrations(),
// "Will report failed file to the client before end of session");
// reportPacker.m_client.reportMigrationResults(*(reportPacker.m_listReports),chrono);
// log::ScopedParamContainer sp(reportPacker.m_lc);
// sp.add("connectDuration", chrono.connectDuration)
// .add("sendRecvDuration", chrono.sendRecvDuration)
// .add("transactionId", chrono.transactionId);
// reportPacker.m_lc.log(LOG_INFO, "Reported failed file(s) to the client before end of session");
// // Reset the report lists.
// reportPacker.m_listReports.reset(new tapegateway::FileMigrationReportList);
// }
//}
//------------------------------------------------------------------------------
//ReportFlush::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPacker){
if(!reportPacker.m_errorHappened){
tapeserver::client::ClientInterface::RequestReport chrono;
// We can receive double flushes when the periodic flush happens
// right before the end of session (which triggers also a flush)
// We refrain from sending an empty report to the client in this case.
......@@ -215,12 +165,11 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
computeCompressedSize(reportPacker.m_listReports->successfulMigrations().begin(),
reportPacker.m_listReports->successfulMigrations().end());
//reportPacker.m_client.reportMigrationResults(*(reportPacker.m_listReports),chrono);
while(reportPacker.m_successfulArchiveJobs.size()) {
std::unique_ptr<cta::ArchiveJob> successfulArchiveJob = std::move(reportPacker.m_successfulArchiveJobs.front());
while(!reportPacker.m_successfulArchiveJobs.empty()) {
std::unique_ptr<cta::ArchiveJob> job(std::move(reportPacker.m_successfulArchiveJobs.front()));
job->complete();
reportPacker.m_successfulArchiveJobs.pop();
successfulArchiveJob->complete();
}
}
reportPacker.logReport(reportPacker.m_listReports->successfulMigrations(),"A file was successfully written on the tape");
log::ScopedParamContainer container(reportPacker.m_lc);
container.add("batch size",reportPacker.m_listReports->successfulMigrations().size())
......@@ -243,7 +192,6 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
} else {
// This is an abnormal situation: we should never flush after an error!
reportPacker.m_lc.log(LOG_ALERT,"Received a flush after an error: sending file errors to client");
// reportFileErrors(reportPacker);
}
//reset (ie delete and replace) the current m_listReports.
//Thus all current reports are deleted otherwise they would have been sent again at the next flush
......@@ -254,14 +202,9 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& reportPacker){
client::ClientInterface::RequestReport chrono;
if(!reportPacker.m_errorHappened){
//reportPacker.m_client.reportEndOfSession(chrono);
reportPacker.m_archiveMount->complete();
log::ScopedParamContainer sp(reportPacker.m_lc);
sp.add("connectDuration", chrono.connectDuration)
.add("sendRecvDuration", chrono.sendRecvDuration)
.add("transactionId", chrono.transactionId);
reportPacker.m_lc.log(LOG_INFO,"Reported end of session to client");
if(reportPacker.m_watchdog) {
reportPacker.m_watchdog->addParameter(log::Param("status","success"));
......@@ -272,16 +215,11 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& r
}
}
else {
// reportFileErrors(reportPacker);
// We have some errors: report end of session as such to the client
//reportPacker.m_client.reportEndOfSessionWithError("Previous file errors",SEINTERNAL,chrono);
reportPacker.m_archiveMount->failed(cta::exception::Exception("Previous file errors"));
log::ScopedParamContainer sp(reportPacker.m_lc);
sp.add("errorMessage", "Previous file errors")
.add("errorCode", SEINTERNAL)
.add("connectDuration", chrono.connectDuration)
.add("sendRecvDuration", chrono.sendRecvDuration)
.add("transactionId", chrono.transactionId);
.add("errorCode", SEINTERNAL);
reportPacker.m_lc.log(LOG_ERR,"Reported end of session with error to client due to previous file errors");
if(reportPacker.m_watchdog) {
reportPacker.m_watchdog->addParameter(log::Param("status","failure"));
......@@ -297,18 +235,12 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& r
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& reportPacker){
client::ClientInterface::RequestReport chrono;
if(reportPacker.m_errorHappened) {
// reportFileErrors(reportPacker);
//reportPacker.m_client.reportEndOfSessionWithError(m_message,m_errorCode,chrono);
reportPacker.m_archiveMount->failed(cta::exception::Exception(m_message));
log::ScopedParamContainer sp(reportPacker.m_lc);
sp.add("errorMessage", m_message)
.add("errorCode", m_errorCode)
.add("connectDuration", chrono.connectDuration)
.add("sendRecvDuration", chrono.sendRecvDuration)
.add("transactionId", chrono.transactionId);
.add("errorCode", m_errorCode);
reportPacker.m_lc.log(LOG_INFO,"Reported end of session with error to client after sending file errors");
} else{
const std::string& msg ="Reported end of session with error to client";
......@@ -317,7 +249,6 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
if (ENOSPC != m_errorCode) {
m_errorCode = SEINTERNAL;
}
//reportPacker.m_client.reportEndOfSessionWithError(msg,m_errorCode,chrono);
reportPacker.m_archiveMount->failed(cta::exception::Exception(msg));
reportPacker.m_lc.log(LOG_INFO,msg);
}
......@@ -335,20 +266,8 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
//ReportError::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPacker){
// std::unique_ptr<tapegateway::FileErrorReportStruct> failedMigration(new tapegateway::FileErrorReportStruct);
// //failedMigration->setFileMigrationReportList(reportPacker.m_listReports.get());
// failedMigration->setErrorCode(m_error_code);
// failedMigration->setErrorMessage(m_error_msg);
// failedMigration->setFseq(m_migratedFile.fseq());
// failedMigration->setFileTransactionId(m_migratedFile.fileTransactionId());
// failedMigration->setFileid(m_migratedFile.fileid());
// failedMigration->setNshost(m_migratedFile.nshost());
// failedMigration->setPositionCommandCode(m_migratedFile.positionCommandCode());
//
// reportPacker.m_listReports->addFailedMigrations(failedMigration.release());
m_failedArchiveJob->failed();
reportPacker.m_errorHappened=true;
m_failedArchiveJob->failed();
}
//------------------------------------------------------------------------------
......@@ -362,8 +281,6 @@ m_parent(parent) {
//------------------------------------------------------------------------------
void MigrationReportPacker::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(log::Param("thread", "ReportPacker"));
client::ClientInterface::RequestReport chrono;
try{
while(m_parent.m_continue) {
std::unique_ptr<Report> rep (m_parent.m_fifo.pop());
......@@ -373,9 +290,8 @@ void MigrationReportPacker::WorkerThread::run(){
catch(const failedMigrationRecallResult& e){
//here we catch a failed report MigrationResult. We try to close and it that fails too
//we end up in the catch below
//m_parent.m_client.reportEndOfSessionWithError(e.getMessageValue(),SEINTERNAL,chrono);
m_parent.m_archiveMount->failed(e);
m_parent.logRequestReport(chrono,"Successfully closed client's session after the failed report MigrationResult");
m_parent.m_lc.log(LOG_INFO,"Successfully closed client's session after the failed report MigrationResult");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(log::Param("status","failure"));
......@@ -387,7 +303,7 @@ void MigrationReportPacker::WorkerThread::run(){
catch(const castor::exception::Exception& e){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
m_parent.logRequestReport(chrono,"tried to report endOfSession(WithError) and got an exception, cant do much more",LOG_ERR);
m_parent.m_lc.log(LOG_ERR,"tried to report endOfSession(WithError) and got an exception, cant do much more");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(log::Param("status","failure"));
......
......@@ -102,14 +102,6 @@ private:
public:
virtual ~Report(){}
virtual void execute(MigrationReportPacker& packer)=0;
protected:
/**
* Utility function to be shared by both ReportEndofSession and
* ReportEndofSessionWithErrors: if an error for a given file is
* recorded, we will transmit it to the client before signaling the
* end of the session.
*/
//virtual void reportFileErrors(MigrationReportPacker& reportPacker);
};
class ReportSuccessful : public Report {
const FileStruct m_migratedFile;
......
......@@ -103,11 +103,9 @@ void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
client::ClientInterface::RequestReport chrono;
if(!parent.errorHappened()){
// parent.m_client.reportEndOfSession(chrono);
parent.m_retrieveMount->complete();
parent.logRequestReport(chrono,"Nominal RecallReportPacker::EndofSession has been reported",LOG_INFO);
parent.m_lc.log(LOG_INFO,"Nominal RecallReportPacker::EndofSession has been reported");
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(log::Param("status","success"));
// We have a race condition here between the processing of this message by
......@@ -119,9 +117,7 @@ void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent)
else {
const std::string& msg ="RecallReportPacker::EndofSession has been reported but an error happened somewhere in the process";
parent.m_lc.log(LOG_ERR,msg);
// parent.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
parent.m_retrieveMount->failed(cta::exception::Exception(msg));
parent.logRequestReport(chrono,"reporting EndOfSessionWithError done",LOG_ERR);
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(log::Param("status","failure"));
// We have a race condition here between the processing of this message by
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment