RecallReportPacker.cpp 8.63 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
25
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
26
#include "castor/log/Logger.hpp"
27
28
#include "log.h"
#include "serrno.h"
29

30
#include <signal.h>
31

32
namespace{
33
  struct failedReportRecallResult : public castor::exception::Exception{
34
35
36
37
    failedReportRecallResult(const std::string& s): Exception(s){}
  };
}

David COME's avatar
David COME committed
38
39
40
using castor::log::LogContext;
using castor::log::Param;

41
42
43
44
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
45
46
47
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
48
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, log::LogContext lc):
49
ReportPackerInterface<detail::Recall>(lc),
50
        m_workerThread(*this),m_errorHappened(false), m_retrieveMount(retrieveMount){
51
52

}
53
54
55
//------------------------------------------------------------------------------
//Destructor
//------------------------------------------------------------------------------
56
RecallReportPacker::~RecallReportPacker(){
57
  castor::server::MutexLocker ml(&m_producterProtection);
58
}
59
60
61
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
62
63
void RecallReportPacker::reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob){
  std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulRetrieveJob)));
64
  castor::server::MutexLocker ml(&m_producterProtection);
65
66
  m_fifo.push(rep.release());
}
67
68
69
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------  
70
71
void RecallReportPacker::reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob){
  std::unique_ptr<Report> rep(new ReportError(std::move(failedRetrieveJob)));
72
  castor::server::MutexLocker ml(&m_producterProtection);
73
74
  m_fifo.push(rep.release());
}
75
76
77
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
78
void RecallReportPacker::reportEndOfSession(){
79
  castor::server::MutexLocker ml(&m_producterProtection);
80
81
82
  m_fifo.push(new ReportEndofSession());
}
  
83
84
85
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
86
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
87
  castor::server::MutexLocker ml(&m_producterProtection);
88
89
  m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
90

91
92
//------------------------------------------------------------------------------
//ReportSuccessful::execute
93
//------------------------------------------------------------------------------
94
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
95
  m_successfulRetrieveJob->complete();
96
}
97

98
99
100
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
101
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
102
    if(!parent.errorHappened()){
103
      parent.m_retrieveMount->complete();
Daniele Kruse's avatar
Daniele Kruse committed
104
      parent.m_lc.log(LOG_INFO,"Nominal RecallReportPacker::EndofSession has been reported");
105
      if (parent.m_watchdog) {
106
        parent.m_watchdog->addParameter(log::Param("status","success"));
107
108
109
110
        // We have a race condition here between the processing of this message by
        // the initial process and the printing of the end-of-session log, triggered
        // by the end our process. To delay the latter, we sleep half a second here.
        usleep(500*1000);
111
      }
112
113
114
    }
    else {
      const std::string& msg ="RecallReportPacker::EndofSession has been reported  but an error happened somewhere in the process";
115
      parent.m_lc.log(LOG_ERR,msg);
116
      parent.m_retrieveMount->complete();
117
118
      if (parent.m_watchdog) {
        parent.m_watchdog->addParameter(log::Param("status","failure"));
119
120
121
122
        // We have a race condition here between the processing of this message by
        // the initial process and the printing of the end-of-session log, triggered
        // by the end our process. To delay the latter, we sleep half a second here.
        usleep(500*1000);
123
      }
124
    }
125
}
126
127
128
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
129
130
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
  if(parent.m_errorHappened) {
131
    parent.m_retrieveMount->complete();
132
133
    LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
    parent.m_lc.log(LOG_ERR,m_message);
134
135
  }
  else{
136
   const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported  but NO error was detected during the process";
137
   parent.m_lc.log(LOG_ERR,msg);  
138
   parent.m_retrieveMount->complete();
139
  }
140
141
  if (parent.m_watchdog) {
    parent.m_watchdog->addParameter(log::Param("status","failure"));
142
143
144
145
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
146
  }
147
}
148
149
150
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
151
152
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
  parent.m_errorHappened=true;
153
154
  parent.m_lc.log(LOG_ERR,m_failedRetrieveJob->failureMessage);
  m_failedRetrieveJob->failed();
155
156
}
//------------------------------------------------------------------------------
157
158
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
159
160
161
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
162
163
164
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
165
void RecallReportPacker::WorkerThread::run(){
166
167
  m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
  m_parent.m_lc.log(LOG_DEBUG, "Starting RecallReportPacker thread");
168
  try{
169
170
171
172
173
174
175
    while(1) {    
      std::unique_ptr<Report> rep(m_parent.m_fifo.pop());    
      rep->execute(m_parent);

      if(rep->goingToEnd()) {
        break;
      }
176
    }
177
  }
178
  catch(const castor::exception::Exception& e){
179
180
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
181
    m_parent.m_lc.log(LOG_ERR,"tried to report endOfSession(WithError) and got an exception, cant do much more");
182
183
184
185
    if (m_parent.m_watchdog) {
      m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
      m_parent.m_watchdog->addParameter(log::Param("status","failure"));
    }
186
  }
187
  m_parent.m_lc.log(LOG_DEBUG, "Finishing RecallReportPacker thread");
188
}
189
190
191
192
193
194
195
196

//------------------------------------------------------------------------------
//errorHappened()
//------------------------------------------------------------------------------
bool RecallReportPacker::errorHappened() {
  return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}

197
}}}}