RecallReportPacker.cpp 16.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
25
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
26
#include "common/log/Logger.hpp"
27

28
#include <signal.h>
29
#include <iostream>
30
#include <cxxabi.h>
31

32
namespace{
33
  struct failedReportRecallResult : public cta::exception::Exception{
34
35
36
37
    failedReportRecallResult(const std::string& s): Exception(s){}
  };
}

Victor Kotlyar's avatar
Victor Kotlyar committed
38
39
using cta::log::LogContext;
using cta::log::Param;
David COME's avatar
David COME committed
40

41
42
43
44
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
45
46
47
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
Victor Kotlyar's avatar
Victor Kotlyar committed
48
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, cta::log::LogContext lc):
49
50
51
52
  ReportPackerInterface<detail::Recall>(lc),
  m_workerThread(*this), m_errorHappened(false), m_retrieveMount(retrieveMount),
  m_tapeThreadComplete(false), m_diskThreadComplete(false)
{
53
  
54
}
55
56
57
//------------------------------------------------------------------------------
//Destructor
//------------------------------------------------------------------------------
58
RecallReportPacker::~RecallReportPacker(){
59
  cta::threading::MutexLocker ml(m_producterProtection);
60
}
61
62
63
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
64
65
void RecallReportPacker::reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob){
  std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulRetrieveJob)));
66
  cta::threading::MutexLocker ml(m_producterProtection);
67
68
  m_fifo.push(rep.release());
}
69
70
71
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------  
72
73
void RecallReportPacker::reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob){
  std::unique_ptr<Report> rep(new ReportError(std::move(failedRetrieveJob)));
74
  cta::threading::MutexLocker ml(m_producterProtection);
75
76
  m_fifo.push(rep.release());
}
77
78
79
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
80
void RecallReportPacker::reportEndOfSession(){
81
  cta::threading::MutexLocker ml(m_producterProtection);
82
83
  m_fifo.push(new ReportEndofSession());
}
84
85

//------------------------------------------------------------------------------
Daniele Kruse's avatar
Daniele Kruse committed
86
//reportDriveStatus
87
//------------------------------------------------------------------------------
88
void RecallReportPacker::reportDriveStatus(cta::common::dataStructures::DriveStatus status) {
89
  cta::threading::MutexLocker ml(m_producterProtection);
Daniele Kruse's avatar
Daniele Kruse committed
90
  m_fifo.push(new ReportDriveStatus(status));
91
92
}

93
  
94
95
96
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
97
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
98
  cta::threading::MutexLocker ml(m_producterProtection);
99
100
  m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
101

102
103
104
105
106
  
//------------------------------------------------------------------------------
//reportTestGoingToEnd
//------------------------------------------------------------------------------
void RecallReportPacker::reportTestGoingToEnd(){
107
  cta::threading::MutexLocker ml(m_producterProtection);
108
109
110
  m_fifo.push(new ReportTestGoingToEnd());
}

111
112
//------------------------------------------------------------------------------
//ReportSuccessful::execute
113
//------------------------------------------------------------------------------
114
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
115
  m_successfulRetrieveJob->asyncComplete();
116
  parent.m_successfulRetrieveJobs.push(std::move(m_successfulRetrieveJob));
117
}
118

119
120
121
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
122
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
123
  if(!parent.errorHappened()){
Victor Kotlyar's avatar
Victor Kotlyar committed
124
    parent.m_lc.log(cta::log::INFO,"Nominal RecallReportPacker::EndofSession has been reported");
125
    if (parent.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
126
      parent.m_watchdog->addParameter(cta::log::Param("status","success"));
127
128
129
130
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
131
    }
132
133
134
  }
  else {
    const std::string& msg ="RecallReportPacker::EndofSession has been reported  but an error happened somewhere in the process";
Victor Kotlyar's avatar
Victor Kotlyar committed
135
    parent.m_lc.log(cta::log::ERR,msg);
136
    if (parent.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
137
      parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
138
139
140
141
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
142
    }
143
144
145
146
147
148
  }
}

//------------------------------------------------------------------------------
//ReportEndofSession::goingToEnd
//------------------------------------------------------------------------------
149
bool RecallReportPacker::ReportEndofSession::goingToEnd() {
150
  return true;
151
}
152

Daniele Kruse's avatar
Daniele Kruse committed
153
154
155
156
157
//------------------------------------------------------------------------------
//ReportDriveStatus::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportDriveStatus::execute(RecallReportPacker& parent){
  parent.m_retrieveMount->setDriveStatus(m_status);
158
  if(m_status==cta::common::dataStructures::DriveStatus::Unmounting) {
159
160
161
    parent.m_retrieveMount->diskComplete();
    parent.m_retrieveMount->tapeComplete();
  }
Daniele Kruse's avatar
Daniele Kruse committed
162
163
164
165
166
}

//------------------------------------------------------------------------------
//ReportDriveStatus::goingToEnd
//------------------------------------------------------------------------------
167
bool RecallReportPacker::ReportDriveStatus::goingToEnd() {
Daniele Kruse's avatar
Daniele Kruse committed
168
169
170
  return false;
}

171
172
173
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
174
175
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
  if(parent.m_errorHappened) {
176
    LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
Victor Kotlyar's avatar
Victor Kotlyar committed
177
    parent.m_lc.log(cta::log::ERR,m_message);
178
179
  }
  else{
180
    const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported  but NO error was detected during the process";
Victor Kotlyar's avatar
Victor Kotlyar committed
181
    parent.m_lc.log(cta::log::ERR,msg);
182
  }
183
  if (parent.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
184
    parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
185
186
187
188
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
189
  }
190
}
191
192
193
194

//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::goingToEnd
//------------------------------------------------------------------------------
195
bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd() {
196
  return true;
197
198
}

199
200
201
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
202
203
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
  parent.m_errorHappened=true;
Victor Kotlyar's avatar
Victor Kotlyar committed
204
  parent.m_lc.log(cta::log::ERR,m_failedRetrieveJob->failureMessage);
205
  m_failedRetrieveJob->failed(parent.m_lc);
206
}
207

208
//------------------------------------------------------------------------------
209
210
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
211
212
213
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
214
215
216
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
217
void RecallReportPacker::WorkerThread::run(){
218
  m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
Victor Kotlyar's avatar
Victor Kotlyar committed
219
  m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallReportPacker thread");
220
  bool endFound = false;
221
222
  
  std::list <std::unique_ptr<Report>> reportedSuccessfully;
223
224
225
226
227
  while(1) {
    std::string debugType;
    std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
    {
      cta::log::ScopedParamContainer spc(m_parent.m_lc);
228
229
230
231
232
233
234
235
      int demangleStatus;
      char * demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus);
      if (!demangleStatus) {
        spc.add("typeId", demangledReportType);
      } else {
        spc.add("typeId", typeid(*rep.get()).name());
      }
      free(demangledReportType);
236
      if (rep->goingToEnd())
237
238
        spc.add("goingToEnd", "true");
      m_parent.m_lc.log(cta::log::DEBUG, "Popping report");
239
    }
240
241
242
243
244
245
246
247
    // Record whether we found end before calling the potentially exception
    // throwing execute().)
    if (rep->goingToEnd())
      endFound=true;
    // We can afford to see any report to fail and keep passing the following ones
    // as opposed to migrations where one failure fails the session.
    try {
      rep->execute(m_parent);
248
249
250
251
252
253
      // This slightly hackish bit prevents too many calls to sub function and gettime()
      // m_parent.fullCheckAndFinishAsyncExecute will execute the shared half of the
      // request updates (individual, asynchronous is done in rep->execute(m_parent);
      if (typeid(*rep) == typeid(RecallReportPacker::ReportSuccessful) 
          && m_parent.m_successfulRetrieveJobs.size() >= m_parent.RECALL_REPORT_PACKER_FLUSH_SIZE)
        m_parent.fullCheckAndFinishAsyncExecute();
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
    } catch(const cta::exception::Exception& e){
      //we get there because to tried to close the connection and it failed
      //either from the catch a few lines above or directly from rep->execute
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.getMessageValue())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
    } catch(const std::exception& e){
      //we get there because to tried to close the connection and it failed
      //either from the catch a few lines above or directly from rep->execute
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.what())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
    } catch(...){
      //we get there because to tried to close the connection and it failed
      //either from the catch a few lines above or directly from rep->execute
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
284
    }
285
    if (endFound) break;
286
  }
287
  
288
  // Make sure the last batch of reports got cleaned up. 
289
  try {
290
    m_parent.fullCheckAndFinishAsyncExecute(); 
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
  } catch(const cta::exception::Exception& e){
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.getMessageValue())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      } 
  } catch(const std::exception& e){
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.what())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
  } catch(...){
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
  } 
  
317
318
319
320
  // Drain the fifo in case we got an exception
  if (!endFound) {
    while (1) {
      std::unique_ptr<Report> report(m_parent.m_fifo.pop());
321
      if (report->goingToEnd())
322
323
324
        break;
    }
  }
325
326
327
  // Cross check that the queue is indeed empty.
  while (m_parent.m_fifo.size()) {
    // There is at least one extra report we missed.
328
    // The drive status reports are not a problem though.
Victor Kotlyar's avatar
Victor Kotlyar committed
329
    cta::log::ScopedParamContainer spc(m_parent.m_lc);
330
331
332
333
    std::unique_ptr<Report> missedReport(m_parent.m_fifo.pop());
    spc.add("ReportType", typeid(*missedReport).name());
    if (missedReport->goingToEnd())
      spc.add("goingToEnd", "true");
334
335
    if (typeid(*missedReport) != typeid(RecallReportPacker::ReportDriveStatus))
      m_parent.m_lc.log(cta::log::ERR, "Popping missed report (memory leak)");
336
  }
Victor Kotlyar's avatar
Victor Kotlyar committed
337
  m_parent.m_lc.log(cta::log::DEBUG, "Finishing RecallReportPacker thread");
338
}
339
340
341
342
343
344
345
346

//------------------------------------------------------------------------------
//errorHappened()
//------------------------------------------------------------------------------
bool RecallReportPacker::errorHappened() {
  return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}

347
348
349
//------------------------------------------------------------------------------
//fullCheckAndFinishAsyncExecute()
//------------------------------------------------------------------------------
350
351
void RecallReportPacker::fullCheckAndFinishAsyncExecute() {
  m_retrieveMount->waitAndFinishSettingJobsBatchRetrieved(m_successfulRetrieveJobs, m_lc);
352
353
}

354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
//------------------------------------------------------------------------------
//reportTapeDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setTapeDone() {
  m_tapeThreadComplete = true;
}

//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setDiskDone() {
  m_diskThreadComplete = true;
}

//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
bool RecallReportPacker::allThreadsDone() {
  return m_tapeThreadComplete && m_diskThreadComplete;
}

375
}}}}