RecallReportPacker.cpp 19.4 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17
18

#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
19
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
20
#include "common/log/Logger.hpp"
21
#include "common/utils/utils.hpp"
22
#include "objectstore/Backend.hpp"
23

24
#include <signal.h>
25
#include <iostream>
26
#include <cxxabi.h>
27

28
namespace{
29
  struct failedReportRecallResult : public cta::exception::Exception{
30
31
32
33
    failedReportRecallResult(const std::string& s): Exception(s){}
  };
}

Victor Kotlyar's avatar
Victor Kotlyar committed
34
35
using cta::log::LogContext;
using cta::log::Param;
David COME's avatar
David COME committed
36

37
38
39
40
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
41
42
43
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
Victor Kotlyar's avatar
Victor Kotlyar committed
44
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, cta::log::LogContext lc):
45
46
47
48
  ReportPackerInterface<detail::Recall>(lc),
  m_workerThread(*this), m_errorHappened(false), m_retrieveMount(retrieveMount),
  m_tapeThreadComplete(false), m_diskThreadComplete(false)
{
49
  
50
}
51
52
53
//------------------------------------------------------------------------------
//Destructor
//------------------------------------------------------------------------------
54
RecallReportPacker::~RecallReportPacker(){
55
  cta::threading::MutexLocker ml(m_producterProtection);
56
}
57
58
59
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
60
61
void RecallReportPacker::reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob){
  std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulRetrieveJob)));
62
  cta::threading::MutexLocker ml(m_producterProtection);
63
64
  m_fifo.push(rep.release());
}
65
66
67
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------  
68
69
void RecallReportPacker::reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob, const cta::exception::Exception & ex){
  std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() +
70
      " " + ex.getMessageValue();
71
  std::unique_ptr<Report> rep(new ReportError(std::move(failedRetrieveJob), failureLog));
72
  cta::threading::MutexLocker ml(m_producterProtection);
73
74
  m_fifo.push(rep.release());
}
75
76
77
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
78
void RecallReportPacker::reportEndOfSession(){
79
  cta::threading::MutexLocker ml(m_producterProtection);
80
81
  m_fifo.push(new ReportEndofSession());
}
82
83

//------------------------------------------------------------------------------
Daniele Kruse's avatar
Daniele Kruse committed
84
//reportDriveStatus
85
//------------------------------------------------------------------------------
86
void RecallReportPacker::reportDriveStatus(cta::common::dataStructures::DriveStatus status, const cta::optional<std::string> & reason) {
87
  cta::threading::MutexLocker ml(m_producterProtection);
88
  m_fifo.push(new ReportDriveStatus(status,reason));
89
90
}

91
  
92
93
94
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
95
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
96
  cta::threading::MutexLocker ml(m_producterProtection);
97
98
  m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
99

100
101
102
103
104
  
//------------------------------------------------------------------------------
//reportTestGoingToEnd
//------------------------------------------------------------------------------
void RecallReportPacker::reportTestGoingToEnd(){
105
  cta::threading::MutexLocker ml(m_producterProtection);
106
107
108
  m_fifo.push(new ReportTestGoingToEnd());
}

109
110
//------------------------------------------------------------------------------
//ReportSuccessful::execute
111
//------------------------------------------------------------------------------
112
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
113
114
115
116
117
118
119
120
121
  try{
    m_successfulRetrieveJob->asyncSetSuccessful();
    parent.m_successfulRetrieveJobs.push(std::move(m_successfulRetrieveJob));
  } catch (const cta::objectstore::Backend::NoSuchObject &ex){
    cta::log::ScopedParamContainer params(parent.m_lc);
    params.add("ExceptionMSG", ex.getMessageValue())
          .add("fileId", m_successfulRetrieveJob->archiveFile.archiveFileID);
    parent.m_lc.log(cta::log::WARNING,"In RecallReportPacker::ReportSuccessful::execute(): call to m_successfulRetrieveJob->asyncSetSuccessful() failed, job does not exist in the objectstore.");
  }
122
}
123

124
125
126
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
127
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
128
  parent.setDiskDone();
129
  if(!parent.errorHappened()){
Victor Kotlyar's avatar
Victor Kotlyar committed
130
    parent.m_lc.log(cta::log::INFO,"Nominal RecallReportPacker::EndofSession has been reported");
131
    if (parent.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
132
      parent.m_watchdog->addParameter(cta::log::Param("status","success"));
133
134
135
136
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
137
    }
138
139
140
  }
  else {
    const std::string& msg ="RecallReportPacker::EndofSession has been reported  but an error happened somewhere in the process";
Victor Kotlyar's avatar
Victor Kotlyar committed
141
    parent.m_lc.log(cta::log::ERR,msg);
142
    if (parent.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
143
      parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
144
145
146
147
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
148
    }
149
150
151
152
153
154
  }
}

//------------------------------------------------------------------------------
//ReportEndofSession::goingToEnd
//------------------------------------------------------------------------------
155
bool RecallReportPacker::ReportEndofSession::goingToEnd() {
156
  return true;
157
}
158

Daniele Kruse's avatar
Daniele Kruse committed
159
160
161
162
//------------------------------------------------------------------------------
//ReportDriveStatus::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportDriveStatus::execute(RecallReportPacker& parent){
163
  parent.m_retrieveMount->setDriveStatus(m_status,m_reason);
164
  if(m_status==cta::common::dataStructures::DriveStatus::Unmounting) {
165
166
    parent.setTapeDone();
    parent.setTapeComplete();
167
  }
Daniele Kruse's avatar
Daniele Kruse committed
168
169
170
171
172
}

//------------------------------------------------------------------------------
//ReportDriveStatus::goingToEnd
//------------------------------------------------------------------------------
173
bool RecallReportPacker::ReportDriveStatus::goingToEnd() {
Daniele Kruse's avatar
Daniele Kruse committed
174
175
176
  return false;
}

177
178
179
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
180
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
181
  parent.setDiskDone();
182
  if(parent.m_errorHappened) {
183
    LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
Victor Kotlyar's avatar
Victor Kotlyar committed
184
    parent.m_lc.log(cta::log::ERR,m_message);
185
186
  }
  else{
187
    const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported  but NO error was detected during the process";
Victor Kotlyar's avatar
Victor Kotlyar committed
188
    parent.m_lc.log(cta::log::ERR,msg);
189
  }
190
  if (parent.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
191
    parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
192
193
194
195
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
196
  }
197
}
198
199
200
201

//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::goingToEnd
//------------------------------------------------------------------------------
202
bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd() {
203
  return true;
204
205
}

206
207
208
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
209
210
void RecallReportPacker::ReportError::execute(RecallReportPacker& reportPacker){
  reportPacker.m_errorHappened=true;
211
  {
212
213
214
215
216
217
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("failureLog", m_failureLog)
          .add("fileId", m_failedRetrieveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::ERR,"In RecallReportPacker::ReportError::execute(): failing retrieve job after exception.");
  }
  try {
218
    m_failedRetrieveJob->transferFailed(m_failureLog, reportPacker.m_lc);
219
220
221
222
223
  } catch (const cta::objectstore::Backend::NoSuchObject &ex){
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("ExceptionMSG", ex.getMessageValue())
          .add("fileId", m_failedRetrieveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::WARNING,"In RecallReportPacker::ReportError::execute(): call to m_failedRetrieveJob->failed() , job does not exist in the objectstore.");
224
225
226
227
228
  } catch (cta::exception::Exception & ex) {
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("ExceptionMSG", ex.getMessageValue())
          .add("fileId", m_failedRetrieveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::ERR,"In RecallReportPacker::ReportError::execute(): call to m_failedRetrieveJob->failed() threw an exception.");
229
    reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace());
230
  }
231
}
232

233
//------------------------------------------------------------------------------
234
235
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
236
237
238
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
239
240
241
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
242
void RecallReportPacker::WorkerThread::run(){
243
  m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
Victor Kotlyar's avatar
Victor Kotlyar committed
244
  m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallReportPacker thread");
245
  bool endFound = false;
246
247
  
  std::list <std::unique_ptr<Report>> reportedSuccessfully;
248
  cta::utils::Timer t;
249
250
251
252
253
  while(1) {
    std::string debugType;
    std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
    {
      cta::log::ScopedParamContainer spc(m_parent.m_lc);
254
255
256
257
258
259
260
261
      int demangleStatus;
      char * demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus);
      if (!demangleStatus) {
        spc.add("typeId", demangledReportType);
      } else {
        spc.add("typeId", typeid(*rep.get()).name());
      }
      free(demangledReportType);
262
      if (rep->goingToEnd())
263
264
        spc.add("goingToEnd", "true");
      m_parent.m_lc.log(cta::log::DEBUG, "Popping report");
265
    }
266
267
268
269
270
271
272
273
    // Record whether we found end before calling the potentially exception
    // throwing execute().)
    if (rep->goingToEnd())
      endFound=true;
    // We can afford to see any report to fail and keep passing the following ones
    // as opposed to migrations where one failure fails the session.
    try {
      rep->execute(m_parent);
274
275
276
277
      // This slightly hackish bit prevents too many calls to sub function and gettime()
      // m_parent.fullCheckAndFinishAsyncExecute will execute the shared half of the
      // request updates (individual, asynchronous is done in rep->execute(m_parent);
      if (typeid(*rep) == typeid(RecallReportPacker::ReportSuccessful) 
278
279
          && (m_parent.m_successfulRetrieveJobs.size() >= m_parent.RECALL_REPORT_PACKER_FLUSH_SIZE || t.secs() >= m_parent.RECALL_REPORT_PACKER_FLUSH_TIME )){
        m_parent.m_lc.log(cta::log::INFO,"m_parent.fullCheckAndFinishAsyncExecute()");
280
        m_parent.fullCheckAndFinishAsyncExecute();
281
282
        t.reset();
      }
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
    } catch(const cta::exception::Exception& e){
      //we get there because to tried to close the connection and it failed
      //either from the catch a few lines above or directly from rep->execute
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.getMessageValue())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
    } catch(const std::exception& e){
      //we get there because to tried to close the connection and it failed
      //either from the catch a few lines above or directly from rep->execute
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.what())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
    } catch(...){
      //we get there because to tried to close the connection and it failed
      //either from the catch a few lines above or directly from rep->execute
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
313
    }
314
    if (endFound) break;
315
  }
316
  
317
  // Make sure the last batch of reports got cleaned up. 
318
  try {
319
    m_parent.fullCheckAndFinishAsyncExecute();
320
    if(m_parent.isDiskDone()){
321
322
323
      //The m_parent.m_diskThreadComplete is set to true when a ReportEndOfSession or a ReportAndOfSessionWithError
      //has been put. It is only after the fullCheckandFinishAsyncExecute is finished that we can say to the mount that the disk thread is complete.
      m_parent.m_lc.log(cta::log::DEBUG, "In RecallReportPacker::WorkerThread::run(): all disk threads are finished, telling the mount that Disk threads are complete");
324
      m_parent.setDiskComplete();
325
    }
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
  } catch(const cta::exception::Exception& e){
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.getMessageValue())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      } 
  } catch(const std::exception& e){
      cta::log::ScopedParamContainer params(m_parent.m_lc);
      params.add("exceptionWhat", e.what())
            .add("exceptionType", typeid(e).name());
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
  } catch(...){
      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception.");
      if (m_parent.m_watchdog) {
        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
      }
  } 
  
352
353
354
355
  // Drain the fifo in case we got an exception
  if (!endFound) {
    while (1) {
      std::unique_ptr<Report> report(m_parent.m_fifo.pop());
356
      if (report->goingToEnd())
357
358
359
        break;
    }
  }
360
361
362
  // Cross check that the queue is indeed empty.
  while (m_parent.m_fifo.size()) {
    // There is at least one extra report we missed.
363
    // The drive status reports are not a problem though.
Victor Kotlyar's avatar
Victor Kotlyar committed
364
    cta::log::ScopedParamContainer spc(m_parent.m_lc);
365
366
367
368
    std::unique_ptr<Report> missedReport(m_parent.m_fifo.pop());
    spc.add("ReportType", typeid(*missedReport).name());
    if (missedReport->goingToEnd())
      spc.add("goingToEnd", "true");
369
370
    if (typeid(*missedReport) != typeid(RecallReportPacker::ReportDriveStatus))
      m_parent.m_lc.log(cta::log::ERR, "Popping missed report (memory leak)");
371
  }
Victor Kotlyar's avatar
Victor Kotlyar committed
372
  m_parent.m_lc.log(cta::log::DEBUG, "Finishing RecallReportPacker thread");
373
}
374
375
376
377
378
379
380
381

//------------------------------------------------------------------------------
//errorHappened()
//------------------------------------------------------------------------------
bool RecallReportPacker::errorHappened() {
  return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}

382
383
384
//------------------------------------------------------------------------------
//fullCheckAndFinishAsyncExecute()
//------------------------------------------------------------------------------
385
void RecallReportPacker::fullCheckAndFinishAsyncExecute() {
386
  m_retrieveMount->flushAsyncSuccessReports(m_successfulRetrieveJobs, m_lc);
387
388
}

389
390
391
392
//------------------------------------------------------------------------------
//reportTapeDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setTapeDone() {
393
  cta::threading::MutexLocker mutexLocker(m_mutex);
394
  m_tapeThreadComplete = true;
395
396
397
}

void RecallReportPacker::setTapeComplete(){
398
  cta::threading::MutexLocker mutexLocker(m_mutex);
399
  m_retrieveMount->tapeComplete();
400
401
}

402
void RecallReportPacker::setDiskComplete(){
403
  cta::threading::MutexLocker mutexLocker(m_mutex);
404
405
406
407
  m_retrieveMount->diskComplete();
}

bool RecallReportPacker::isDiskDone(){
408
  cta::threading::MutexLocker mutexLocker(m_mutex);
409
410
411
  return m_diskThreadComplete;
}

412
413
414
415
//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setDiskDone() {
416
  cta::threading::MutexLocker mutexLocker(m_mutex);
417
418
419
420
421
422
423
424
425
426
  m_diskThreadComplete = true;
}

//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
bool RecallReportPacker::allThreadsDone() {
  return m_tapeThreadComplete && m_diskThreadComplete;
}

427
}}}}