/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2003-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "common/log/Logger.hpp"
#include "common/utils/utils.hpp"
#include "objectstore/Backend.hpp"
#include
#include
#include
namespace{
struct failedReportRecallResult : public cta::exception::Exception{
failedReportRecallResult(const std::string& s): Exception(s){}
};
}
using cta::log::LogContext;
using cta::log::Param;
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, cta::log::LogContext lc):
ReportPackerInterface(lc),
m_workerThread(*this), m_errorHappened(false), m_retrieveMount(retrieveMount),
m_tapeThreadComplete(false), m_diskThreadComplete(false)
{
}
//------------------------------------------------------------------------------
//Destructor
//------------------------------------------------------------------------------
RecallReportPacker::~RecallReportPacker(){
cta::threading::MutexLocker ml(m_producterProtection);
}
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
void RecallReportPacker::reportCompletedJob(std::unique_ptr successfulRetrieveJob){
std::unique_ptr rep(new ReportSuccessful(std::move(successfulRetrieveJob)));
cta::threading::MutexLocker ml(m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------
void RecallReportPacker::reportFailedJob(std::unique_ptr failedRetrieveJob, const cta::exception::Exception & ex){
std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() +
" " + ex.getMessageValue();
std::unique_ptr rep(new ReportError(std::move(failedRetrieveJob), failureLog));
cta::threading::MutexLocker ml(m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
void RecallReportPacker::reportEndOfSession(){
cta::threading::MutexLocker ml(m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
//------------------------------------------------------------------------------
//reportDriveStatus
//------------------------------------------------------------------------------
void RecallReportPacker::reportDriveStatus(cta::common::dataStructures::DriveStatus status, const cta::optional & reason) {
cta::threading::MutexLocker ml(m_producterProtection);
m_fifo.push(new ReportDriveStatus(status,reason));
}
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
cta::threading::MutexLocker ml(m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//reportTestGoingToEnd
//------------------------------------------------------------------------------
void RecallReportPacker::reportTestGoingToEnd(){
cta::threading::MutexLocker ml(m_producterProtection);
m_fifo.push(new ReportTestGoingToEnd());
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
try{
m_successfulRetrieveJob->asyncSetSuccessful();
parent.m_successfulRetrieveJobs.push(std::move(m_successfulRetrieveJob));
} catch (const cta::objectstore::Backend::NoSuchObject &ex){
cta::log::ScopedParamContainer params(parent.m_lc);
params.add("ExceptionMSG", ex.getMessageValue())
.add("fileId", m_successfulRetrieveJob->archiveFile.archiveFileID);
parent.m_lc.log(cta::log::WARNING,"In RecallReportPacker::ReportSuccessful::execute(): call to m_successfulRetrieveJob->asyncSetSuccessful() failed, job does not exist in the objectstore.");
}
}
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
parent.setDiskDone();
if(!parent.errorHappened()){
parent.m_lc.log(cta::log::INFO,"Nominal RecallReportPacker::EndofSession has been reported");
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(cta::log::Param("status","success"));
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
}
else {
const std::string& msg ="RecallReportPacker::EndofSession has been reported but an error happened somewhere in the process";
parent.m_lc.log(cta::log::ERR,msg);
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
}
}
//------------------------------------------------------------------------------
//ReportEndofSession::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSession::goingToEnd() {
return true;
}
//------------------------------------------------------------------------------
//ReportDriveStatus::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportDriveStatus::execute(RecallReportPacker& parent){
parent.m_retrieveMount->setDriveStatus(m_status,m_reason);
if(m_status==cta::common::dataStructures::DriveStatus::Unmounting) {
parent.setTapeDone();
parent.setTapeComplete();
}
}
//------------------------------------------------------------------------------
//ReportDriveStatus::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportDriveStatus::goingToEnd() {
return false;
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
parent.setDiskDone();
if(parent.m_errorHappened) {
LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
parent.m_lc.log(cta::log::ERR,m_message);
}
else{
const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported but NO error was detected during the process";
parent.m_lc.log(cta::log::ERR,msg);
}
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd() {
return true;
}
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportError::execute(RecallReportPacker& reportPacker){
reportPacker.m_errorHappened=true;
{
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("failureLog", m_failureLog)
.add("fileId", m_failedRetrieveJob->archiveFile.archiveFileID);
reportPacker.m_lc.log(cta::log::ERR,"In RecallReportPacker::ReportError::execute(): failing retrieve job after exception.");
}
try {
m_failedRetrieveJob->transferFailed(m_failureLog, reportPacker.m_lc);
} catch (const cta::objectstore::Backend::NoSuchObject &ex){
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("ExceptionMSG", ex.getMessageValue())
.add("fileId", m_failedRetrieveJob->archiveFile.archiveFileID);
reportPacker.m_lc.log(cta::log::WARNING,"In RecallReportPacker::ReportError::execute(): call to m_failedRetrieveJob->failed() , job does not exist in the objectstore.");
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("ExceptionMSG", ex.getMessageValue())
.add("fileId", m_failedRetrieveJob->archiveFile.archiveFileID);
reportPacker.m_lc.log(cta::log::ERR,"In RecallReportPacker::ReportError::execute(): call to m_failedRetrieveJob->failed() threw an exception.");
reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace());
}
}
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void RecallReportPacker::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallReportPacker thread");
bool endFound = false;
std::list > reportedSuccessfully;
cta::utils::Timer t;
while(1) {
std::string debugType;
std::unique_ptr rep(m_parent.m_fifo.pop());
{
cta::log::ScopedParamContainer spc(m_parent.m_lc);
int demangleStatus;
char * demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus);
if (!demangleStatus) {
spc.add("typeId", demangledReportType);
} else {
spc.add("typeId", typeid(*rep.get()).name());
}
free(demangledReportType);
if (rep->goingToEnd())
spc.add("goingToEnd", "true");
m_parent.m_lc.log(cta::log::DEBUG, "Popping report");
}
// Record whether we found end before calling the potentially exception
// throwing execute().)
if (rep->goingToEnd())
endFound=true;
// We can afford to see any report to fail and keep passing the following ones
// as opposed to migrations where one failure fails the session.
try {
rep->execute(m_parent);
// This slightly hackish bit prevents too many calls to sub function and gettime()
// m_parent.fullCheckAndFinishAsyncExecute will execute the shared half of the
// request updates (individual, asynchronous is done in rep->execute(m_parent);
if (typeid(*rep) == typeid(RecallReportPacker::ReportSuccessful)
&& (m_parent.m_successfulRetrieveJobs.size() >= m_parent.RECALL_REPORT_PACKER_FLUSH_SIZE || t.secs() >= m_parent.RECALL_REPORT_PACKER_FLUSH_TIME )){
m_parent.m_lc.log(cta::log::INFO,"m_parent.fullCheckAndFinishAsyncExecute()");
m_parent.fullCheckAndFinishAsyncExecute();
t.reset();
}
} catch(const cta::exception::Exception& e){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
cta::log::ScopedParamContainer params(m_parent.m_lc);
params.add("exceptionWhat", e.getMessageValue())
.add("exceptionType", typeid(e).name());
m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception.");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
}
} catch(const std::exception& e){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
cta::log::ScopedParamContainer params(m_parent.m_lc);
params.add("exceptionWhat", e.what())
.add("exceptionType", typeid(e).name());
m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception.");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
}
} catch(...){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception.");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
}
}
if (endFound) break;
}
// Make sure the last batch of reports got cleaned up.
try {
m_parent.fullCheckAndFinishAsyncExecute();
if(m_parent.isDiskDone()){
//The m_parent.m_diskThreadComplete is set to true when a ReportEndOfSession or a ReportAndOfSessionWithError
//has been put. It is only after the fullCheckandFinishAsyncExecute is finished that we can say to the mount that the disk thread is complete.
m_parent.m_lc.log(cta::log::DEBUG, "In RecallReportPacker::WorkerThread::run(): all disk threads are finished, telling the mount that Disk threads are complete");
m_parent.setDiskComplete();
}
} catch(const cta::exception::Exception& e){
cta::log::ScopedParamContainer params(m_parent.m_lc);
params.add("exceptionWhat", e.getMessageValue())
.add("exceptionType", typeid(e).name());
m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception.");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
}
} catch(const std::exception& e){
cta::log::ScopedParamContainer params(m_parent.m_lc);
params.add("exceptionWhat", e.what())
.add("exceptionType", typeid(e).name());
m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception.");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
}
} catch(...){
m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception.");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
}
}
// Drain the fifo in case we got an exception
if (!endFound) {
while (1) {
std::unique_ptr report(m_parent.m_fifo.pop());
if (report->goingToEnd())
break;
}
}
// Cross check that the queue is indeed empty.
while (m_parent.m_fifo.size()) {
// There is at least one extra report we missed.
// The drive status reports are not a problem though.
cta::log::ScopedParamContainer spc(m_parent.m_lc);
std::unique_ptr missedReport(m_parent.m_fifo.pop());
spc.add("ReportType", typeid(*missedReport).name());
if (missedReport->goingToEnd())
spc.add("goingToEnd", "true");
if (typeid(*missedReport) != typeid(RecallReportPacker::ReportDriveStatus))
m_parent.m_lc.log(cta::log::ERR, "Popping missed report (memory leak)");
}
m_parent.m_lc.log(cta::log::DEBUG, "Finishing RecallReportPacker thread");
}
//------------------------------------------------------------------------------
//errorHappened()
//------------------------------------------------------------------------------
bool RecallReportPacker::errorHappened() {
return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}
//------------------------------------------------------------------------------
//fullCheckAndFinishAsyncExecute()
//------------------------------------------------------------------------------
void RecallReportPacker::fullCheckAndFinishAsyncExecute() {
m_retrieveMount->flushAsyncSuccessReports(m_successfulRetrieveJobs, m_lc);
}
//------------------------------------------------------------------------------
//reportTapeDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setTapeDone() {
cta::threading::MutexLocker mutexLocker(m_mutex);
m_tapeThreadComplete = true;
}
void RecallReportPacker::setTapeComplete(){
cta::threading::MutexLocker mutexLocker(m_mutex);
m_retrieveMount->tapeComplete();
}
void RecallReportPacker::setDiskComplete(){
cta::threading::MutexLocker mutexLocker(m_mutex);
m_retrieveMount->diskComplete();
}
bool RecallReportPacker::isDiskDone(){
cta::threading::MutexLocker mutexLocker(m_mutex);
return m_diskThreadComplete;
}
//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setDiskDone() {
cta::threading::MutexLocker mutexLocker(m_mutex);
m_diskThreadComplete = true;
}
//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
bool RecallReportPacker::allThreadsDone() {
return m_tapeThreadComplete && m_diskThreadComplete;
}
}}}}