Commit 60c8b7ec authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Started cleanup of previous commit :)

parent 4f6101c3
......@@ -137,9 +137,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
RecallReportPacker rrp(retrieveMount,
m_castorConf.bulkRequestRecallMaxFiles,
lc);
RecallReportPacker rrp(retrieveMount, lc);
rrp.disableBulk(); //no bulk needed anymore
RecallWatchDog rwd(15,60*10,m_intialProcess,m_driveConfig.getUnitName(),lc);
......
......@@ -49,7 +49,7 @@ namespace unitTests{
MOCK_METHOD0(reportEndOfSession, void());
MOCK_METHOD2(reportEndOfSessionWithErrors, void(const std::string,int));
MockRecallReportPacker(cta::RetrieveMount *rm,castor::log::LogContext lc):
RecallReportPacker(rm,1,lc){}
RecallReportPacker(rm,lc){}
};
TEST(castor_tape_tapeserver_daemon, DiskWriteTaskFailledBlock){
......
......@@ -44,7 +44,7 @@ namespace unitTests{
MOCK_METHOD0(reportEndOfSession, void());
MOCK_METHOD2(reportEndOfSessionWithErrors, void(const std::string,int));
MockRecallReportPacker(cta::RetrieveMount *rm, castor::log::LogContext lc):
RecallReportPacker(rm,1,lc){}
RecallReportPacker(rm,lc){}
};
struct MockTaskInjector : public RecallTaskInjector{
......
......@@ -47,10 +47,9 @@ namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount,
unsigned int reportFilePeriod,log::LogContext lc):
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, log::LogContext lc):
ReportPackerInterface<detail::Recall>(lc),
m_workerThread(*this),m_reportFilePeriod(reportFilePeriod),m_errorHappened(false), m_retrieveMount(retrieveMount){
m_workerThread(*this),m_errorHappened(false), m_retrieveMount(retrieveMount){
}
//------------------------------------------------------------------------------
......@@ -97,62 +96,9 @@ void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
// std::unique_ptr<FileSuccessStruct> successRecall(new FileSuccessStruct);
//
// successRecall->setFseq(m_recalledFile.fseq());
// successRecall->setFileTransactionId(m_recalledFile.fileTransactionId());
// successRecall->setId(m_recalledFile.id());
// successRecall->setNshost(m_recalledFile.nshost());
// successRecall->setFileid(m_recalledFile.fileid());
// successRecall->setPath(m_recalledFile.path());
// successRecall->setFileSize(m_size);
//
// //WARNING : ad hoc name of checksum algorithm
// successRecall->setChecksumName("adler32");
// successRecall->setChecksum(m_checksum);
//
// parent.m_listReports->addSuccessfulRecalls(successRecall.release());
parent.m_successfulRetrieveJobs.push(std::move(m_successfulRetrieveJob));
}
//------------------------------------------------------------------------------
//flush
//------------------------------------------------------------------------------
void RecallReportPacker::flush(){
//we dont want to send empty reports
unsigned int totalSize = m_listReports->failedRecalls().size() +
m_listReports->successfulRecalls().size();
if(totalSize==0) {
return;
}
client::ClientInterface::RequestReport chrono;
try{
// m_client.reportRecallResults(*m_listReports,chrono);
while(m_successfulRetrieveJobs.size()) {
std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob = std::move(m_successfulRetrieveJobs.front());
m_successfulRetrieveJobs.pop();
successfulRetrieveJob->complete(0,0); //TODO: put size and checksum
}
{
log::ScopedParamContainer params(m_lc);
params.add("successCount", m_listReports->successfulRecalls().size())
.add("failureCount", m_listReports->failedRecalls().size());
logRequestReport(chrono,"RecallReportList successfully transmitted to client (contents follow)");
}
logReport(m_listReports->failedRecalls(),"Reported failed recall to client");
logReport(m_listReports->successfulRecalls(),"Reported successful recall to client");
} catch(const castor::exception::Exception& e){
LogContext::ScopedParam s(m_lc, Param("exceptionCode",e.code()));
LogContext::ScopedParam ss(m_lc, Param("exceptionMessageValue", e.getMessageValue()));
LogContext::ScopedParam sss(m_lc, Param("exceptionWhat",e.what()));
const std::string msg_error="An exception was caught trying to call reportRecallResults";
m_lc.log(LOG_ERR,msg_error);
throw failedReportRecallResult(msg_error);
}
//delete the old pointer and replace it with the new one provided
//that way, all the reports that have been send are deleted (by FileReportList's destructor)
m_listReports.reset(new FileReportList);
m_successfulRetrieveJob->complete(m_checksum, m_size);
}
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
......@@ -189,17 +135,14 @@ void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent)
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
client::ClientInterface::RequestReport chrono;
if(parent.m_errorHappened) {
// parent.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
parent.m_retrieveMount->failed(cta::exception::Exception(m_message));
LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
parent.m_lc.log(LOG_ERR,m_message);
parent.m_retrieveMount->failed(cta::exception::Exception(m_message));
LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
parent.m_lc.log(LOG_ERR,m_message);
}
else{
const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported but NO error was detected during the process";
parent.m_lc.log(LOG_ERR,msg);
// parent.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
parent.m_lc.log(LOG_ERR,msg);
parent.m_retrieveMount->failed(cta::exception::Exception(msg));
}
if (parent.m_watchdog) {
......@@ -214,17 +157,6 @@ void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacke
//ReportError::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
// std::unique_ptr<FileErrorStruct> failed(new FileErrorStruct);
// //failedMigration->setFileMigrationReportList(parent.m_listReports.get());
// failed->setErrorCode(m_error_code);
// failed->setErrorMessage(m_error_msg);
// failed->setFseq(m_recalledFile.fseq());
// failed->setFileTransactionId(m_recalledFile.fileTransactionId());
// failed->setId(m_recalledFile.id());
// failed->setNshost(m_recalledFile.nshost());
//
// parent.m_listReports->addFailedRecalls(failed.release());
parent.m_errorHappened=true;
m_failedRetrieveJob->failed(cta::exception::Exception(m_error_msg));
}
......@@ -242,68 +174,13 @@ void RecallReportPacker::WorkerThread::run(){
m_parent.m_lc.log(LOG_DEBUG, "Starting RecallReportPacker thread");
client::ClientInterface::RequestReport chrono;
try{
while(1) {
std::unique_ptr<Report> rep (m_parent.m_fifo.pop());
/*
* this boolean is only true if it is the last turn of the loop
* == rep is ReportEndOFSession or ReportEndOFSessionWithError
*/
bool isItTheEnd = rep->goingToEnd();
/*
* if it is not the last turn, we want to execute the report
* (= insert the file into thr right list of results) BEFORE (trying to)
* flush
*/
if(!isItTheEnd){
rep->execute(m_parent);
}
//how mane files we have globally treated
unsigned int totalSize = m_parent.m_listReports->failedRecalls().size() +
m_parent.m_listReports->successfulRecalls().size();
//If we have enough reports or we are going to end the loop
// or it is the end (== unconditional flush )
// or we bypass the queuing system if the client is readtp
// then we flush
if(totalSize >= m_parent.m_reportFilePeriod || isItTheEnd ||
detail::ReportByFile == m_parent.m_reportBatching)
{
try{
m_parent.flush();
}
catch(const failedReportRecallResult& e){
//got there because we failed to report the recall results
//we have to try to close the connection.
//reportEndOfSessionWithError might throw
// m_parent.m_client.reportEndOfSessionWithError(e.getMessageValue(),SEINTERNAL,chrono);
m_parent.m_retrieveMount->failed(e);
m_parent.logRequestReport(chrono,"Successfully closed client's session after the failed report RecallResult");
if (m_parent.m_watchdog) {
m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
m_parent.m_watchdog->addParameter(log::Param("status","failure"));
}
// We need to wait until the end of session is signaled from upsteam
while (!isItTheEnd) {
std::unique_ptr<Report> r(m_parent.m_fifo.pop());
isItTheEnd = r->goingToEnd();
}
break;
}
}
/*
* It is the last turn of loop, we are going to send
* an EndOfSession (WithError) to the client. We need to have flushed
* all leftover BEFORE. Because as soon as we report the end, we can not
* report any the longer the success or failure of any job
*/
if(isItTheEnd) {
rep->execute(m_parent);
break;
}
while(1) {
std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
rep->execute(m_parent);
if(rep->goingToEnd()) {
break;
}
}
}
catch(const castor::exception::Exception& e){
......@@ -316,8 +193,6 @@ void RecallReportPacker::WorkerThread::run(){
}
}
m_parent.m_lc.log(LOG_DEBUG, "Finishing RecallReportPacker thread");
//When we end up there, we might have still
}
//------------------------------------------------------------------------------
......
......@@ -39,12 +39,10 @@ class RecallReportPacker : public ReportPackerInterface<detail::Recall> {
public:
/**
* Constructor
* @param tg the client to whom we report the success/failures
* @param reportFilePeriod how often do we report to the client
* @param tg the client to whom we report the success/failures
* @param lc log context, copied du to threads
*/
RecallReportPacker(cta::RetrieveMount *retrieveMount, unsigned int reportFilePeriod,
log::LogContext lc);
RecallReportPacker(cta::RetrieveMount *retrieveMount, log::LogContext lc);
virtual ~RecallReportPacker();
......@@ -109,7 +107,7 @@ private:
u_int64_t m_size;
/**
* The successful retrieve job to be pushed in the report packer queue and reported later
* The successful retrieve job to be reported immediately
*/
std::unique_ptr<cta::RetrieveJob> m_successfulRetrieveJob;
public:
......@@ -156,11 +154,6 @@ private:
virtual void run();
} m_workerThread;
/**
* Function periodically called to report the results to the client
*/
void flush();
castor::server::Mutex m_producterProtection;
/**
......@@ -168,11 +161,6 @@ private:
*/
castor::server::BlockingQueue<Report*> m_fifo;
/**
How often do we report to the client
*/
unsigned int m_reportFilePeriod;
/**
* Is set as true as soon as we process a reportFailedJob
* That we can do a sanity check to make sure we always call
......@@ -184,11 +172,6 @@ private:
* The mount object used to send reports
*/
cta::RetrieveMount * m_retrieveMount;
/**
* The successful retrieve jobs to be reported when flushing
*/
std::queue<std::unique_ptr<cta::RetrieveJob> > m_successfulRetrieveJobs;
};
}}}}
......
......@@ -40,7 +40,7 @@ TEST(castor_tape_tapeserver_daemon, RecallReportPackerNominal) {
castor::log::StringLogger log("castor_tape_tapeserver_RecallReportPackerNominal");
castor::log::LogContext lc(log);
std::unique_ptr<cta::MockSchedulerDatabase> mdb(new cta::MockSchedulerDatabase);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),2,lc);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),lc);
rrp.startThreads();
tapegateway::FileToRecallStruct recalledFiled;
......@@ -63,7 +63,7 @@ TEST(castor_tape_tapeserver_daemon, RecallReportPackerCumulated) {
castor::log::StringLogger log("castor_tape_tapeserver_RecallReportPackerCumulated");
castor::log::LogContext lc(log);
std::unique_ptr<cta::MockSchedulerDatabase> mdb(new cta::MockSchedulerDatabase);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),2,lc);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),lc);
rrp.startThreads();
tapegateway::FileToRecallStruct recalledFiled;
......@@ -90,7 +90,7 @@ TEST(castor_tape_tapeserver_daemon, RecallReportPackerBadBadEnd) {
castor::log::LogContext lc(log);
std::unique_ptr<cta::MockSchedulerDatabase> mdb(new cta::MockSchedulerDatabase);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),2,lc);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),lc);
rrp.startThreads();
tapegateway::FileToRecallStruct recalledFiled;
......@@ -120,7 +120,7 @@ TEST(castor_tape_tapeserver_daemon, RecallReportPackerBadGoodEnd) {
castor::log::LogContext lc(log);
std::unique_ptr<cta::MockSchedulerDatabase> mdb(new cta::MockSchedulerDatabase);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),2,lc);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),lc);
rrp.startThreads();
......@@ -151,7 +151,7 @@ TEST(castor_tape_tapeserver_daemon, RecallReportPackerGoodBadEnd) {
castor::log::LogContext lc(log);
std::unique_ptr<cta::MockSchedulerDatabase> mdb(new cta::MockSchedulerDatabase);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),2,lc);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),lc);
rrp.startThreads();
tapegateway::FileToRecallStruct recalledFiled;
......@@ -180,7 +180,7 @@ TEST(castor_tape_tapeserver_daemon, RecallReportPackerFaillure) {
castor::log::LogContext lc(log);
std::unique_ptr<cta::MockSchedulerDatabase> mdb(new cta::MockSchedulerDatabase);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),2,lc);
tapeserver::daemon::RecallReportPacker rrp(dynamic_cast<cta::RetrieveMount *>((mdb->getNextMount("ll","drive")).get()),lc);
rrp.startThreads();
tapegateway::FileToRecallStruct recalledFiled;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment