MigrationReportPacker.cpp 23.3 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17
18

#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
19
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
20
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
21
#include "catalogue/TapeFileWritten.hpp"
22
#include "common/utils/utils.hpp"
23
#include "objectstore/Backend.hpp"
24
25

#include <memory>
26
#include <numeric>
27
#include <cstdio>
28

Victor Kotlyar's avatar
Victor Kotlyar committed
29
30
using cta::log::LogContext;
using cta::log::Param;
31

32
33
34
35
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
36
37
38
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
39
MigrationReportPacker::MigrationReportPacker(cta::ArchiveMount *archiveMount,
Victor Kotlyar's avatar
Victor Kotlyar committed
40
  cta::log::LogContext & lc):
41
ReportPackerInterface<detail::Migration>(lc),
42
m_workerThread(*this),m_errorHappened(false),m_continue(true), m_archiveMount(archiveMount) {
43
}
44
//------------------------------------------------------------------------------
45
//Destructor
46
//------------------------------------------------------------------------------
47
MigrationReportPacker::~MigrationReportPacker(){
48
  cta::threading::MutexLocker ml(m_producterProtection);
49
}
50
51
52
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------ 
David COME's avatar
David COME committed
53
void MigrationReportPacker::reportCompletedJob(
54
  std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc) {
55
  std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulArchiveJob)));
56
57
58
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportSuccessful");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportCompletedJob(), pushing a report.");
59
  cta::threading::MutexLocker ml(m_producterProtection);
60
61
  m_fifo.push(rep.release());
}
62
//------------------------------------------------------------------------------
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//reportSkippedJob
//------------------------------------------------------------------------------ 
void MigrationReportPacker::reportSkippedJob(std::unique_ptr<cta::ArchiveJob> skippedArchiveJob, const std::string& failure,
    cta::log::LogContext& lc) {
  std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() +
      " " + failure;
  std::unique_ptr<Report> rep(new ReportSkipped(std::move(skippedArchiveJob), failureLog));
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReporSkipped");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportSkippedJob(), pushing a report.");
  cta::threading::MutexLocker ml(m_producterProtection);
  m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
77
78
//reportFailedJob
//------------------------------------------------------------------------------ 
79
void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
80
        const cta::exception::Exception &ex, cta::log::LogContext & lc){
81
  std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() +
82
      " " + ex.getMessageValue();
83
  std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob), failureLog));
84
85
86
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportError");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportFailedJob(), pushing a report.");
87
  cta::threading::MutexLocker ml(m_producterProtection);
88
89
  m_fifo.push(rep.release());
}
90
91
//------------------------------------------------------------------------------
//reportFlush
92
//------------------------------------------------------------------------------
93
94
95
96
void MigrationReportPacker::reportFlush(drive::compressionStats compressStats, cta::log::LogContext & lc){
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportFlush");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportFlush(), pushing a report.");
97
  cta::threading::MutexLocker ml(m_producterProtection);
98
99
  std::unique_ptr<Report> rep(new ReportFlush(compressStats));
  m_fifo.push(rep.release());
100
}
101
//------------------------------------------------------------------------------
102
103
//reportTapeFull
//------------------------------------------------------------------------------
104
105
106
107
void MigrationReportPacker::reportTapeFull(cta::log::LogContext & lc){
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportTapeFull");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportTapeFull(), pushing a report.");
108
  cta::threading::MutexLocker ml(m_producterProtection);
109
110
  std::unique_ptr<Report> rep(new ReportTapeFull());
  m_fifo.push(rep.release());
111
112
}
//------------------------------------------------------------------------------
113
114
//reportEndOfSession
//------------------------------------------------------------------------------ 
115
116
117
118
void MigrationReportPacker::reportEndOfSession(cta::log::LogContext & lc) {
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportEndofSession");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportEndOfSession(), pushing a report.");
119
  cta::threading::MutexLocker ml(m_producterProtection);
120
121
  std::unique_ptr<Report> rep(new ReportEndofSession());
  m_fifo.push(rep.release());
122
}
123
124
125
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------ 
126
127
128
129
void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int errorCode, cta::log::LogContext & lc){
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportEndofSessionWithErrors");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportEndOfSessionWithErrors(), pushing a report.");
130
  cta::threading::MutexLocker ml(m_producterProtection);
131
132
  std::unique_ptr<Report> rep(new ReportEndofSessionWithErrors(msg,errorCode));
  m_fifo.push(rep.release());
133
}
134
135
136
137

//------------------------------------------------------------------------------
//reportTestGoingToEnd
//------------------------------------------------------------------------------
138
139
140
141
void MigrationReportPacker::reportTestGoingToEnd(cta::log::LogContext & lc){
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportTestGoingToEnd");
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportTestGoingToEnd(), pushing a report.");
142
  cta::threading::MutexLocker ml(m_producterProtection);
143
144
  std::unique_ptr<Report> rep(new ReportTestGoingToEnd());
  m_fifo.push(rep.release());
145
146
}

147
148
149
//------------------------------------------------------------------------------
//synchronousReportEndWithErrors
//------------------------------------------------------------------------------ 
150
151
152
void MigrationReportPacker::synchronousReportEndWithErrors(const std::string msg, int errorCode, cta::log::LogContext & lc){
  cta::log::ScopedParamContainer params(lc);
  params.add("type", "ReportEndofSessionWithErrors");
153
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::synchronousReportEndWithErrors(), reporting asynchronously session complete.");
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
  m_continue=false;
  m_archiveMount->complete();
  if(m_errorHappened) {
    cta::log::ScopedParamContainer sp(lc);
    sp.add("errorMessage", msg)
      .add("errorCode", errorCode);
    lc.log(cta::log::INFO,"Reported end of session with error to client after sending file errors");
  } else{
    const std::string& msg ="Reported end of session with error to client";
    // As a measure of safety we censor any session error which is not ENOSPC into
    // Meaningless 666 (used to be SEINTERNAL in CASTOR). ENOSPC is the only one interpreted by the tape gateway.
    if (ENOSPC != errorCode) {
      errorCode = 666;
    }
    lc.log(cta::log::INFO,msg);
  }
  if(m_watchdog) {
    m_watchdog->addParameter(cta::log::Param("status",
      ENOSPC == errorCode?"success":"failure"));
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
  }
178
}
179

180
//------------------------------------------------------------------------------
181
182
//ReportSuccessful::execute
//------------------------------------------------------------------------------
183
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& reportPacker){
184
  reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob));
185
}
Daniele Kruse's avatar
Daniele Kruse committed
186

187
188
189
190
191
192
193
194
//------------------------------------------------------------------------------
//reportSkipped:execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSkipped::execute(MigrationReportPacker& reportPacker) {
  // We have no successful file to add, but we should report the failure for the file.
  {
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("failureLog", m_failureLog)
195
          .add("fileSize",m_skippedArchiveJob->archiveFile.fileSize)
196
          .add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID);
197
198
199
    m_skippedArchiveJob->archiveFile.checksumBlob.addFirstChecksumToLog(params);
    
    reportPacker.m_lc.log(cta::log::DEBUG,"In MigrationReportPacker::ReportSkipped::execute(): skipping archive job after exception.");
200
201
  }
  try {
202
    m_skippedArchiveJob->transferFailed(m_failureLog, reportPacker.m_lc);
203
204
205
206
207
  } catch (cta::objectstore::Backend::NoSuchObject & ex){
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("ExceptionMSG", ex.getMessageValue())
          .add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::WARNING,"In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed(), job does not exist in the objectstore.");
208
209
210
211
212
213
214
  } catch (cta::exception::Exception & ex) {
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("ExceptionMSG", ex.getMessageValue())
          .add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed() threw an exception.");
    reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace());
  }
215
216
217
218
219
  reportPacker.m_skippedFiles.push(cta::catalogue::TapeItemWritten());
  auto & tapeItem = reportPacker.m_skippedFiles.back();
  tapeItem.fSeq = m_skippedArchiveJob->tapeFile.fSeq;
  tapeItem.tapeDrive = reportPacker.m_archiveMount->getDrive();
  tapeItem.vid = m_skippedArchiveJob->tapeFile.vid;
220
221
}

Daniele Kruse's avatar
Daniele Kruse committed
222
223
224
//------------------------------------------------------------------------------
//reportDriveStatus
//------------------------------------------------------------------------------
225
void MigrationReportPacker::reportDriveStatus(cta::common::dataStructures::DriveStatus status,const cta::optional<std::string> & reason, cta::log::LogContext & lc) {
226
  cta::log::ScopedParamContainer params(lc);
227
228
  params.add("type", "ReportDriveStatus")
        .add("Status", cta::common::dataStructures::toString(status));
229
  lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportDriveStatus(), pushing a report.");
230
  cta::threading::MutexLocker ml(m_producterProtection);
231
  m_fifo.push(new ReportDriveStatus(status,reason));
Daniele Kruse's avatar
Daniele Kruse committed
232
233
234
235
236
237
}

//------------------------------------------------------------------------------
//ReportDriveStatus::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportDriveStatus::execute(MigrationReportPacker& parent){
238
239
240
  cta::log::ScopedParamContainer params(parent.m_lc);
  params.add("status", cta::common::dataStructures::toString(m_status));
  parent.m_lc.log(cta::log::DEBUG, "In MigrationReportPacker::ReportDriveStatus::execute(): reporting drive status.");
241
  parent.m_archiveMount->setDriveStatus(m_status,m_reason);
Daniele Kruse's avatar
Daniele Kruse committed
242
243
}

244
//------------------------------------------------------------------------------
245
246
//ReportFlush::execute
//------------------------------------------------------------------------------
247
248
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPacker){
  if(!reportPacker.m_errorHappened){
249
250
251
    // We can receive double flushes when the periodic flush happens
    // right before the end of session (which triggers also a flush)
    // We refrain from sending an empty report to the client in this case.
252
    if (reportPacker.m_successfulArchiveJobs.empty() && reportPacker.m_skippedFiles.empty()) {
Victor Kotlyar's avatar
Victor Kotlyar committed
253
      reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing.");
254
255
      return;
    }
256
    std::queue<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> failedToReportArchiveJobs;
257
258
    try{
      reportPacker.m_archiveMount->reportJobsBatchTransferred(reportPacker.m_successfulArchiveJobs, reportPacker.m_skippedFiles, failedToReportArchiveJobs, 
259
        reportPacker.m_lc);
260
261
262
    } catch(const cta::ArchiveMount::FailedMigrationRecallResult &ex){
      while(!failedToReportArchiveJobs.empty()){
        auto archiveJob = std::move(failedToReportArchiveJobs.front());
263
264
265
266
267
        try{
          archiveJob->failTransfer(ex.getMessageValue(),reportPacker.m_lc);
        } catch(const cta::exception::Exception &ex2) {
          //If the failTransfer method fails, we can't do anything about it
        }
268
269
270
271
        failedToReportArchiveJobs.pop();
      }
      throw ex;
    }
272
273
  } else {
    // This is an abnormal situation: we should never flush after an error!
Victor Kotlyar's avatar
Victor Kotlyar committed
274
    reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client");
275
  }
276
}
277
278

//------------------------------------------------------------------------------
279
280
281
282
283
284
285
//reportTapeFull()::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportTapeFull::execute(MigrationReportPacker& reportPacker){
  reportPacker.m_archiveMount->setTapeFull();
}

//------------------------------------------------------------------------------
286
287
//ReportEndofSession::execute
//------------------------------------------------------------------------------
288
void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& reportPacker){
289
  reportPacker.m_continue=false;
290
  reportPacker.m_lc.log(cta::log::DEBUG, "In MigrationReportPacker::ReportEndofSession::execute(): reporting session complete.");
291
  reportPacker.m_archiveMount->complete();
292
  if(!reportPacker.m_errorHappened){
Victor Kotlyar's avatar
Victor Kotlyar committed
293
294
    cta::log::ScopedParamContainer sp(reportPacker.m_lc);
    reportPacker.m_lc.log(cta::log::INFO,"Reported end of session to client");
295
    if(reportPacker.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
296
      reportPacker.m_watchdog->addParameter(cta::log::Param("status","success"));
297
298
299
300
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
301
    }
302
303
  }
  else {
304
    // We have some errors
Victor Kotlyar's avatar
Victor Kotlyar committed
305
    cta::log::ScopedParamContainer sp(reportPacker.m_lc);
306
    sp.add("errorMessage", "Previous file errors");
Victor Kotlyar's avatar
Victor Kotlyar committed
307
    reportPacker.m_lc.log(cta::log::ERR,"Reported end of session with error to client due to previous file errors");
308
    if(reportPacker.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
309
      reportPacker.m_watchdog->addParameter(cta::log::Param("status","failure"));
310
311
312
313
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
314
    }
315
  }
316
}
317

318
319
320
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
321
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& reportPacker){
322
  reportPacker.m_continue=false;
323
  reportPacker.m_lc.log(cta::log::DEBUG, "In MigrationReportPacker::ReportEndofSessionWithErrors::execute(): reporting session complete.");
324
  reportPacker.m_archiveMount->complete();
325
  if(reportPacker.m_errorHappened) {
Victor Kotlyar's avatar
Victor Kotlyar committed
326
    cta::log::ScopedParamContainer sp(reportPacker.m_lc);
327
    sp.add("errorMessage", m_message)
Daniele Kruse's avatar
Daniele Kruse committed
328
      .add("errorCode", m_errorCode);
Victor Kotlyar's avatar
Victor Kotlyar committed
329
    reportPacker.m_lc.log(cta::log::INFO,"Reported end of session with error to client after sending file errors");
330
331
  } else{
    const std::string& msg ="Reported end of session with error to client";
332
333
334
    // As a measure of safety we censor any session error which is not ENOSPC into
    // SEINTERNAL. ENOSPC is the only one interpreted by the tape gateway.
    if (ENOSPC != m_errorCode) {
335
      m_errorCode = 666;
336
    }
Victor Kotlyar's avatar
Victor Kotlyar committed
337
    reportPacker.m_lc.log(cta::log::INFO,msg);
338
  }
339
  if(reportPacker.m_watchdog) {
Victor Kotlyar's avatar
Victor Kotlyar committed
340
    reportPacker.m_watchdog->addParameter(cta::log::Param("status",
341
      ENOSPC == m_errorCode?"success":"failure"));
342
343
344
345
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
346
  }
347
}
348
349
350
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
351
352
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPacker){
  reportPacker.m_errorHappened=true;
353
354
  {
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
355
    params.add("failureLog", m_failureLog)
356
357
358
359
          .add("fileId", m_failedArchiveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportError::execute(): failing archive job after exception.");
  }
  try {
360
    m_failedArchiveJob->transferFailed(m_failureLog, reportPacker.m_lc);
361
362
363
364
365
366
367
  } catch (cta::objectstore::Backend::NoSuchObject & ex){
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("ExceptionMSG", ex.getMessageValue())
          .add("fileId", m_failedArchiveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::WARNING,"In MigrationReportPacker::ReportError::execute(): call to m_failedArchiveJob->failed(), job does not exist in the objectstore.");
  } 
  catch (cta::exception::Exception & ex) {
368
369
370
371
    cta::log::ScopedParamContainer params(reportPacker.m_lc);
    params.add("ExceptionMSG", ex.getMessageValue())
          .add("fileId", m_failedArchiveJob->archiveFile.archiveFileID);
    reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportError::execute(): call to m_failedArchiveJob->failed() threw an exception.");
372
    reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace());
373
  }
374
}
375

376
377
378
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
379
380
381
MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent):
m_parent(parent) {
}
382
383
384
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
385
void MigrationReportPacker::WorkerThread::run(){
386
387
388
  // Create our own log context for the new thread.
  cta::log::LogContext lc = m_parent.m_lc;
  lc.pushOrReplace(cta::log::Param("thread", "ReportPacker"));
389
390
  try{
    while(m_parent.m_continue) {
391
      std::unique_ptr<Report> rep (m_parent.m_fifo.pop());
392
393
394
395
396
397
398
399
400
401
402
403
      {
        cta::log::ScopedParamContainer params(lc);
        int demangleStatus;
        char * demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus);
        if (!demangleStatus) {
          params.add("typeId", demangledReportType);
        } else {
          params.add("typeId", typeid(*rep.get()).name());
        }
        free(demangledReportType);
        lc.log(cta::log::DEBUG,"In MigrationReportPacker::WorkerThread::run(): Got a new report.");
      }
404
      rep->execute(m_parent);
405
    }
406
407
408
  } catch(const cta::exception::Exception& e){
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
409
410
    cta::log::ScopedParamContainer params(lc);
    params.add("exceptionMSG", e.getMessageValue());
411
    lc.log(cta::log::ERR, "In MigrationPacker::run(): Received a CTA exception while reporting archive mount results.");
412
    if (m_parent.m_watchdog) {
413
      m_parent.m_watchdog->addToErrorCount("Error_reporting");
Victor Kotlyar's avatar
Victor Kotlyar committed
414
      m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
415
416
417
418
    }
  } catch(const std::exception& e){
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
419
420
421
422
423
424
425
426
427
    cta::log::ScopedParamContainer params(lc);
    params.add("exceptionMSG", e.what());
    int demangleStatus;
    char * demangleExceptionType = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus);
    if (!demangleStatus) {
      params.add("exceptionType", demangleExceptionType);
    } else {
      params.add("exceptionType", typeid(e).name());
    }
428
    lc.log(cta::log::ERR, "In MigrationPacker::run(): Received a standard exception while reporting archive mount results.");
429
    if (m_parent.m_watchdog) {
430
      m_parent.m_watchdog->addToErrorCount("Error_reporting");
Victor Kotlyar's avatar
Victor Kotlyar committed
431
      m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
432
433
434
435
    }
  } catch(...){
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
436
    lc.log(cta::log::ERR, "In MigrationPacker::run(): Received an unknown exception while reporting archive mount results.");
437
    if (m_parent.m_watchdog) {
438
      m_parent.m_watchdog->addToErrorCount("Error_reporting");
Victor Kotlyar's avatar
Victor Kotlyar committed
439
      m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
440
    }
441
  }
442
443
444
  // Drain the FIFO if necessary. We know that m_continue will be 
  // set by ReportEndofSessionWithErrors or ReportEndofSession
  // TODO devise a more generic mechanism
445
  while(m_parent.m_fifo.size()) {
446
    std::unique_ptr<Report> rep (m_parent.m_fifo.pop());
447
448
449
450
451
452
453
454
455
456
    cta::log::ScopedParamContainer params(lc);
    int demangleStatus;
    char * demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus);
    if (!demangleStatus) {
      params.add("typeId", demangledReportType);
    } else {
      params.add("typeId", typeid(*rep.get()).name());
    }
    free(demangledReportType);
    lc.log(cta::log::DEBUG,"In MigrationReportPacker::WorkerThread::run(): Draining leftover.");
457
  }
458
459
460
}

}}}}