RecallReportPacker.cpp 13.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
25
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
26
#include "castor/log/Logger.hpp"
27

28
#include <signal.h>
29
#include <iostream>
30

31
namespace{
32
  struct failedReportRecallResult : public cta::exception::Exception{
33
34
35
36
    failedReportRecallResult(const std::string& s): Exception(s){}
  };
}

David COME's avatar
David COME committed
37
38
39
using castor::log::LogContext;
using castor::log::Param;

40
41
42
43
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
44
45
46
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
47
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, log::LogContext lc):
48
49
50
51
  ReportPackerInterface<detail::Recall>(lc),
  m_workerThread(*this), m_errorHappened(false), m_retrieveMount(retrieveMount),
  m_tapeThreadComplete(false), m_diskThreadComplete(false)
{
52
53

}
54
55
56
//------------------------------------------------------------------------------
//Destructor
//------------------------------------------------------------------------------
57
RecallReportPacker::~RecallReportPacker(){
58
  cta::threading::MutexLocker ml(m_producterProtection);
59
}
60
61
62
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
63
64
void RecallReportPacker::reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob){
  std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulRetrieveJob)));
65
  cta::threading::MutexLocker ml(m_producterProtection);
66
67
  m_fifo.push(rep.release());
}
68
69
70
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------  
71
72
void RecallReportPacker::reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob){
  std::unique_ptr<Report> rep(new ReportError(std::move(failedRetrieveJob)));
73
  cta::threading::MutexLocker ml(m_producterProtection);
74
75
  m_fifo.push(rep.release());
}
76
77
78
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
79
void RecallReportPacker::reportEndOfSession(){
80
  cta::threading::MutexLocker ml(m_producterProtection);
81
82
  m_fifo.push(new ReportEndofSession());
}
83
84

//------------------------------------------------------------------------------
Daniele Kruse's avatar
Daniele Kruse committed
85
//reportDriveStatus
86
//------------------------------------------------------------------------------
87
void RecallReportPacker::reportDriveStatus(cta::common::DriveStatus status) {
88
  cta::threading::MutexLocker ml(m_producterProtection);
Daniele Kruse's avatar
Daniele Kruse committed
89
  m_fifo.push(new ReportDriveStatus(status));
90
91
}

92
  
93
94
95
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
96
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
97
  cta::threading::MutexLocker ml(m_producterProtection);
98
99
  m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
100

101
102
103
104
105
  
//------------------------------------------------------------------------------
//reportTestGoingToEnd
//------------------------------------------------------------------------------
void RecallReportPacker::reportTestGoingToEnd(){
106
  cta::threading::MutexLocker ml(m_producterProtection);
107
108
109
  m_fifo.push(new ReportTestGoingToEnd());
}

110
111
//------------------------------------------------------------------------------
//ReportSuccessful::execute
112
//------------------------------------------------------------------------------
113
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
114
  m_successfulRetrieveJob->complete();
115
}
116

117
118
119
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
120
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
121
122
123
124
125
126
127
128
  if(!parent.errorHappened()){
    parent.m_lc.log(LOG_INFO,"Nominal RecallReportPacker::EndofSession has been reported");
    if (parent.m_watchdog) {
      parent.m_watchdog->addParameter(log::Param("status","success"));
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
129
    }
130
131
132
133
134
135
136
137
138
139
  }
  else {
    const std::string& msg ="RecallReportPacker::EndofSession has been reported  but an error happened somewhere in the process";
    parent.m_lc.log(LOG_ERR,msg);
    if (parent.m_watchdog) {
      parent.m_watchdog->addParameter(log::Param("status","failure"));
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
140
    }
141
142
143
144
145
146
  }
}

//------------------------------------------------------------------------------
//ReportEndofSession::goingToEnd
//------------------------------------------------------------------------------
147
bool RecallReportPacker::ReportEndofSession::goingToEnd() {
148
  return true;
149
}
150

Daniele Kruse's avatar
Daniele Kruse committed
151
152
153
154
155
//------------------------------------------------------------------------------
//ReportDriveStatus::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportDriveStatus::execute(RecallReportPacker& parent){
  parent.m_retrieveMount->setDriveStatus(m_status);
156
  if(m_status==cta::common::DriveStatus::Unmounting) {
157
158
159
    parent.m_retrieveMount->diskComplete();
    parent.m_retrieveMount->tapeComplete();
  }
Daniele Kruse's avatar
Daniele Kruse committed
160
161
162
163
164
}

//------------------------------------------------------------------------------
//ReportDriveStatus::goingToEnd
//------------------------------------------------------------------------------
165
bool RecallReportPacker::ReportDriveStatus::goingToEnd() {
Daniele Kruse's avatar
Daniele Kruse committed
166
167
168
  return false;
}

169
170
171
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
172
173
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
  if(parent.m_errorHappened) {
174
175
    LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
    parent.m_lc.log(LOG_ERR,m_message);
176
177
  }
  else{
178
    const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported  but NO error was detected during the process";
179
    parent.m_lc.log(LOG_ERR,msg);
180
  }
181
182
  if (parent.m_watchdog) {
    parent.m_watchdog->addParameter(log::Param("status","failure"));
183
184
185
186
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
187
  }
188
}
189
190
191
192

//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::goingToEnd
//------------------------------------------------------------------------------
193
bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd() {
194
  return true;
195
196
}

197
198
199
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
200
201
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
  parent.m_errorHappened=true;
202
203
  parent.m_lc.log(LOG_ERR,m_failedRetrieveJob->failureMessage);
  m_failedRetrieveJob->failed();
204
}
205

206
//------------------------------------------------------------------------------
207
208
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
209
210
211
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
212
213
214
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
215
void RecallReportPacker::WorkerThread::run(){
216
217
  m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
  m_parent.m_lc.log(LOG_DEBUG, "Starting RecallReportPacker thread");
218
  bool endFound = false;
219
  try{
220
221
    while(1) {
      std::string debugType;
Daniele Kruse's avatar
Daniele Kruse committed
222
      std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
223
224
225
226
227
228
229
      {
        log::ScopedParamContainer spc(m_parent.m_lc);
        spc.add("ReportType", debugType=typeid(*rep).name());
        if (rep->goingToEnd())
          spc.add("goingToEnd", "true");
        m_parent.m_lc.log(LOG_DEBUG, "Popping report");
      }
230
231
      // Record whether we found end before calling the potentially exception
      // throwing execute().)
232
233
      if (rep->goingToEnd())
        endFound=true;
234
      rep->execute(m_parent);
235
      if (endFound) break;
236
    }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
  } catch(const cta::exception::Exception& e){
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
    std::stringstream ssEx;
    ssEx << "Tried to report and got a CTA exception, cant do much more. The exception is the following: " << e.getMessageValue();
    m_parent.m_lc.log(LOG_ERR, ssEx.str());
    if (m_parent.m_watchdog) {
      m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
      m_parent.m_watchdog->addParameter(log::Param("status","failure"));
    }
  } catch(const std::exception& e){
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
    std::stringstream ssEx;
    ssEx << "Tried to report and got a standard exception, cant do much more. The exception is the following: " << e.what();
    m_parent.m_lc.log(LOG_ERR, ssEx.str());
    if (m_parent.m_watchdog) {
      m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
      m_parent.m_watchdog->addParameter(log::Param("status","failure"));
    }
  } catch(...){
258
259
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
260
261
262
    std::stringstream ssEx;
    ssEx << "Tried to report and got an unknown exception, cant do much more.";
    m_parent.m_lc.log(LOG_ERR, ssEx.str());
263
264
265
266
    if (m_parent.m_watchdog) {
      m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
      m_parent.m_watchdog->addParameter(log::Param("status","failure"));
    }
267
  }
268
269
270
271
  // Drain the fifo in case we got an exception
  if (!endFound) {
    while (1) {
      std::unique_ptr<Report> report(m_parent.m_fifo.pop());
272
      if (report->goingToEnd())
273
274
275
        break;
    }
  }
276
277
278
279
280
281
282
283
284
285
  // Cross check that the queue is indeed empty.
  while (m_parent.m_fifo.size()) {
    // There is at least one extra report we missed.
    log::ScopedParamContainer spc(m_parent.m_lc);
    std::unique_ptr<Report> missedReport(m_parent.m_fifo.pop());
    spc.add("ReportType", typeid(*missedReport).name());
    if (missedReport->goingToEnd())
      spc.add("goingToEnd", "true");
    m_parent.m_lc.log(LOG_ERR, "Popping missed report (memory leak)");
  }
286
  m_parent.m_lc.log(LOG_DEBUG, "Finishing RecallReportPacker thread");
287
}
288
289
290
291
292
293
294
295

//------------------------------------------------------------------------------
//errorHappened()
//------------------------------------------------------------------------------
bool RecallReportPacker::errorHappened() {
  return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}

296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
//------------------------------------------------------------------------------
//reportTapeDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setTapeDone() {
  m_tapeThreadComplete = true;
}

//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setDiskDone() {
  m_diskThreadComplete = true;
}

//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
bool RecallReportPacker::allThreadsDone() {
  return m_tapeThreadComplete && m_diskThreadComplete;
}

317
}}}}