TaskWatchDog.hpp 18.4 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
David COME's avatar
David COME committed
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
David COME's avatar
David COME committed
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
David COME's avatar
David COME committed
17
18
19

#pragma once 

20

21
22
#include "common/threading/AtomicFlag.hpp"
#include "common/threading/BlockingQueue.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
23
#include "common/log/LogContext.hpp"
24
#include "tapeserver/daemon/TapedProxy.hpp"
25
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
26
#include "common/Timer.hpp"
27
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
28
#include "scheduler/TapeMount.hpp"
29
#include "DataTransferSession.hpp"
30

31
#include <map>
Steven Murray's avatar
Steven Murray committed
32
33
#include <unistd.h>

David COME's avatar
David COME committed
34
35
36
37
38
namespace castor {

namespace tape {
namespace tapeserver {
namespace daemon {
39
/**
40
 * Virtual class for watching tape read or write operation (mostly complete)
41
 */
42
class TaskWatchDog : private cta::threading::Thread{
43
  friend class DataTransferSession;
44
45
46
47
protected:
  /**
   * The mutex protecting updates and the worker thread from each other
   */
48
  cta::threading::Mutex m_mutex;
49
  
50
  /**
51
52
53
   *  Number of blocks we moved since the last update. Has to be atomic because it is 
   *  updated from the outside 
   */
54
  uint64_t m_TapeBytesMovedMoved;
55
  
56
57
58
59
60
61
62
63
64
65
66
67
68
  /**
   * Statistics of the current session
   */
  TapeSessionStats m_stats;
  
  /**
   * Did we set the stats at least once?
   */
  bool m_statsSet;
  
  /**
   * A timer allowing estimation of the total timer for crashing sessions
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
69
  cta::utils::Timer m_tapeThreadTimer;
70
  
71
72
  /**
   *  Timer for regular heartbeat reports to parent process
73
   */  
Victor Kotlyar's avatar
Victor Kotlyar committed
74
  cta::utils::Timer m_reportTimer;
75
76
  
  /*
77
   *  How long since we last logged a warning?
78
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
79
  cta::utils::Timer m_blockMovementReportTimer;
80
  
81
82
  /**
   * how long since the last block movement?
83
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
84
  cta::utils::Timer m_blockMovementTimer;
85
  
86
87
  /**
   * How fast should we tick?
88
   */
89
  const double m_pollPeriod;
90
  
91
92
  /**
   *  How often to we send heartbeat notifications (in second)
93
   */
94
  const double m_reportPeriod; 
95
  
96
97
  /**
   *  How long to we have to wait before saying we are stuck (in second)
98
   */
99
  const double m_stuckPeriod;
100
101
  
  /*
102
   *  Atomic flag to stop the thread's loop
103
   */
104
  cta::threading::AtomicFlag m_stopFlag;
105
106
  
  /*
107
   *  The proxy that will receive or heartbeat notifications
108
   */
109
  cta::tape::daemon::TapedProxy& m_initialProcess;
110
  
111
112
113
114
115
116
  /**
   * Reference to the current mount. Used to report performance statistics to object
   * store.
   */
  cta::TapeMount & m_mount;

117
118
119
120
  /**
   * The drive unit name to report
   */
  const std::string m_driveUnitName;
121
122
123
  /*
   *  Is the system at _this very moment_ reading or writing on tape
   */
124
  bool m_fileBeingMoved;
125
126
127
  /*
   * Logging system  
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
128
  cta::log::LogContext m_lc;
129
130
131
132
133
  
  /*
   * Member function actually logging the file.
   */
  virtual void logStuckFile() = 0;
134
135
136
137
  
  /**
   * One offs parameters to be sent to the initial process
   */
138
139
140
141
142
143
  cta::threading::BlockingQueue<cta::log::Param> m_toAddParamsQueue;

  /**
   * One offs parameters to be deleted from the initial process
   */
  cta::threading::BlockingQueue<std::string> m_toDeleteParamsQueue;
144
145
  
  /**
146
   * Map of all error counts
147
148
   */
  std::map<std::string, uint32_t> m_errorCounts;
149

150
151
152
153
154
155
  /**
   * Send the statistics to the initial process. m_mutex should be taken before
   * calling.
   */
  void reportStats() {
    // Shortcut definitions
Victor Kotlyar's avatar
Victor Kotlyar committed
156
    typedef cta::log::Param Param;
157
158
159
    if (m_statsSet) {
      // Build the statistics to be logged
      std::list<Param> paramList;
160
161
162
163
164
165
      // for delivery time we estimate it ourselves before getting the
      // final stats at the end of the last disk thread. This allows proper
      // estimation of statistics for data transfer sessions that get killed
      // before completion.
      const double deliveryTime = 
        m_stats.deliveryTime?m_stats.deliveryTime:m_tapeThreadTimer.secs();
166
167
168
169
      // total time is a special case. We will estimate it ourselves before
      // getting the final stats (at the end of the thread). This will allow
      // proper estimation of statistics for tape sessions that get killed
      // before completion.
170
171
172
173
      const double totalTime = m_stats.totalTime?m_stats.totalTime:m_tapeThreadTimer.secs();
      /** Time beetwen fineshed tape thread and finished disk threads */
      const double drainingTime = 
        deliveryTime > totalTime?deliveryTime-totalTime: 0.0;
174
175
      bool wasTapeMounted = true;
      if(m_stats.mountTime == 0.0){
176
177
178
        //Tape was not mounted, we add a message to tell that no physical mount has been
        //triggered
        wasTapeMounted = false;
179
180
      }
      paramList.push_back(Param("wasTapeMounted",wasTapeMounted));
181
182
183
184
185
186
187
      paramList.push_back(Param("mountTime", m_stats.mountTime));
      paramList.push_back(Param("positionTime", m_stats.positionTime));
      paramList.push_back(Param("waitInstructionsTime", m_stats.waitInstructionsTime));
      paramList.push_back(Param("waitFreeMemoryTime", m_stats.waitFreeMemoryTime));
      paramList.push_back(Param("waitDataTime", m_stats.waitDataTime));
      paramList.push_back(Param("waitReportingTime", m_stats.waitReportingTime));
      paramList.push_back(Param("checksumingTime", m_stats.checksumingTime));
188
      paramList.push_back(Param("readWriteTime", m_stats.readWriteTime));
189
190
191
      paramList.push_back(Param("flushTime", m_stats.flushTime));
      paramList.push_back(Param("unloadTime", m_stats.unloadTime));
      paramList.push_back(Param("unmountTime", m_stats.unmountTime));
192
      paramList.push_back(Param("encryptionControlTime", m_stats.encryptionControlTime));
193
      paramList.push_back(Param("transferTime", m_stats.transferTime()));
194
      paramList.push_back(Param("totalTime", totalTime));
195
196
      paramList.push_back(Param("deliveryTime", deliveryTime));
      paramList.push_back(Param("drainingTime", drainingTime));
197
198
199
200
201
202
203
204
      paramList.push_back(Param("dataVolume", m_stats.dataVolume));
      paramList.push_back(Param("filesCount", m_stats.filesCount));
      paramList.push_back(Param("headerVolume", m_stats.headerVolume));
      // Compute performance
      paramList.push_back(Param("payloadTransferSpeedMBps", totalTime?1.0*m_stats.dataVolume
                /1000/1000/totalTime:0.0));
      paramList.push_back(Param("driveTransferSpeedMBps", totalTime?1.0*(m_stats.dataVolume+m_stats.headerVolume)
                /1000/1000/totalTime:0.0));
205
206
207
      if(m_mount.getMountType() == cta::common::dataStructures::MountType::Retrieve){
	paramList.push_back(Param("repackFilesCount",m_stats.repackFilesCount));
	paramList.push_back(Param("userFilesCount",m_stats.userFilesCount));
Michael Davis's avatar
Michael Davis committed
208
	paramList.push_back(Param("verifiedFilesCount",m_stats.verifiedFilesCount));
209
210
	paramList.push_back(Param("repackBytesCount",m_stats.repackBytesCount));
	paramList.push_back(Param("userBytesCount",m_stats.userBytesCount));
Michael Davis's avatar
Michael Davis committed
211
	paramList.push_back(Param("verifiedBytesCount",m_stats.verifiedBytesCount));
212
      }
213
214
215
216
217
      // Ship the logs to the initial process
      m_initialProcess.addLogParams(m_driveUnitName, paramList);
    }
  }
  
218
  /**
219
   * Thread's loop
220
   */
221
  void run(){
222
    // Shortcut definitions
Victor Kotlyar's avatar
Victor Kotlyar committed
223
    typedef cta::log::Param Param;
224
225
226
227
    // reset timers as we don't know how long it took before the thread started
    m_reportTimer.reset();
    m_blockMovementReportTimer.reset();
    m_blockMovementTimer.reset();
228
229
    while(!m_stopFlag) {
      
230
      // Critical section block for internal watchdog
231
      {
232
        cta::threading::MutexLocker locker(m_mutex);
David COME's avatar
David COME committed
233
        //useful if we are stuck on a particular file
234
235
236
        if(m_fileBeingMoved && 
           m_blockMovementTimer.secs()>m_stuckPeriod &&
           m_blockMovementReportTimer.secs()>m_stuckPeriod){
237
          // We are stuck while moving a file. We will just log that.
238
239
          logStuckFile();
          m_blockMovementReportTimer.reset();
240
        }
241
      }
242
243
244
245
246
247
      
      // Send any one-off parameter
      {
        std::list<Param> params;
        // This is thread safe because we are the only consumer:
        // a non-zero size guarantees we will find something.
248
249
        while (m_toAddParamsQueue.size())
          params.push_back(m_toAddParamsQueue.pop());
250
251
252
253
        if (params.size()) {
          m_initialProcess.addLogParams(m_driveUnitName, params);
        }
      }
David COME's avatar
David COME committed
254

255
256
257
258
259
260
261
262
263
264
265
266
      // Send any one-off parameter to delete
      {
        std::list<std::string> params;
        // This is thread safe because we are the only consumer:
        // a non-zero size guarantees we will find something.
        while (m_toDeleteParamsQueue.size())
          params.push_back(m_toDeleteParamsQueue.pop());
        if (params.size()) {
          m_initialProcess.deleteLogParams(m_driveUnitName, params);
        }
      }

David COME's avatar
David COME committed
267
      //heartbeat to notify activity to the mother
268
      // and transmit statistics
269
      if(m_reportTimer.secs() > m_reportPeriod){
270
        cta::threading::MutexLocker locker(m_mutex);
Victor Kotlyar's avatar
Victor Kotlyar committed
271
        m_lc.log(cta::log::DEBUG,"going to report");
272
        m_reportTimer.reset();
273
        m_initialProcess.reportHeartbeat(m_TapeBytesMovedMoved, 0);
274
        reportStats(); 
275
276
277
278
279
280
281
282
        try {
          m_mount.setTapeSessionStats(m_stats);
        } catch (cta::exception::Exception & ex) {
          cta::log::ScopedParamContainer params (m_lc);
          params.add("exceptionMessage", ex.getMessageValue());
          m_lc.log(cta::log::WARNING, "In TaskWatchDog::run(): failed to set tape session stats in sched. DB. Skipping.");
        }

283
284
      } 
      else{
285
        usleep(m_pollPeriod*1000*1000);
286
287
      }
    }
288
    // At the end of the run, make sure we push the last stats to the initial
289
    // process and that we process any remaining parameter.
290
    {
291
      cta::threading::MutexLocker locker(m_mutex);
292
      reportStats();
293
294
295
296
297
298
299
      try {
        m_mount.setTapeSessionStats(m_stats);
      } catch (cta::exception::Exception & ex) {
        cta::log::ScopedParamContainer params (m_lc);
        params.add("exceptionMessage", ex.getMessageValue());
        m_lc.log(cta::log::WARNING, "In TaskWatchDog::run(): failed to set tape session stats in sched. DB. Skipping.");
      }
300
301
      // Flush the one-of parameters one last time.
      std::list<Param> params;
302
303
      while (m_toAddParamsQueue.size())
        params.push_back(m_toAddParamsQueue.pop());
304
305
306
      if (params.size()) {
        m_initialProcess.addLogParams(m_driveUnitName, params);
      }
307
308
309
310
311
312
313
314

      std::list<std::string> paramsToDelete;
      // Flush the one-of parameters one last time.
      while (m_toDeleteParamsQueue.size())
        paramsToDelete.push_back(m_toDeleteParamsQueue.pop());
      if (params.size()) {
        m_initialProcess.deleteLogParams(m_driveUnitName, paramsToDelete);
      }
315
    }
316
317
318
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
319
    // To be sure we wait the full half second, we use a timed loop.
Victor Kotlyar's avatar
Victor Kotlyar committed
320
    cta::utils::Timer exitTimer;
321
322
    while (exitTimer.secs() < 0.5)
      usleep(100*1000);
323
  }
324
  
325
  public:
326
327
328
    
  /**
   * Constructor
329
330
   * @param periodToReport How often should we report to the mother (in seconds)
   * @param stuckPeriod  How long do we wait before we say we are stuck on file (in seconds)
331
332
333
334
   * @param initialProcess The proxy we use for sending heartbeat 
   * @param reportPacker 
   * @param lc To log the events
   */
335
  TaskWatchDog(double reportPeriod,double stuckPeriod,
336
         cta::tape::daemon::TapedProxy& initialProcess,
337
338
         cta::TapeMount & mount,
         const std::string & driveUnitName,
Victor Kotlyar's avatar
Victor Kotlyar committed
339
         cta::log::LogContext&  lc, double pollPeriod = 0.1):
340
  m_TapeBytesMovedMoved(0), m_statsSet(false), m_pollPeriod(pollPeriod),
341
  m_reportPeriod(reportPeriod), m_stuckPeriod(stuckPeriod), 
342
  m_initialProcess(initialProcess), m_mount(mount), m_driveUnitName(driveUnitName),
343
  m_fileBeingMoved(false), m_lc(lc) {
Victor Kotlyar's avatar
Victor Kotlyar committed
344
    m_lc.pushOrReplace(cta::log::Param("thread","Watchdog"));
345
  }
346
347
  
  /**
348
   * notify the watchdog a mem block has been moved
349
   */
350
  void notify(uint64_t movedBytes){
351
    cta::threading::MutexLocker locker(m_mutex);
352
    m_blockMovementTimer.reset();
353
    m_TapeBytesMovedMoved+=movedBytes;
354
  }
355
356
357
358
359
360
361
362
363
 
  /**
   * update the watchdog's statistics for the session delivery time
   * @param a new deliveryTime
   */
  void updateStatsDeliveryTime (const double deliveryTime) {
    cta::threading::MutexLocker locker(m_mutex);
    m_stats.deliveryTime = deliveryTime;
  }
364
  
365
366
367
368
369
370
371
372
373
374
375
376
  /**
   * update the watchdog's statistics for the session
   * @param stats the stats counters collection to push
   */
  void updateStatsWithoutDeliveryTime (const TapeSessionStats & stats) {
    cta::threading::MutexLocker locker(m_mutex);
    const double savedDeliveryTime = m_stats.deliveryTime;
    m_stats = stats;
    m_stats.deliveryTime = savedDeliveryTime;
    m_statsSet = true;
  }

377
378
379
380
381
  /**
   * update the watchdog's statistics for the session
   * @param stats the stats counters collection to push
   */
  void updateStats (const TapeSessionStats & stats) {
382
    cta::threading::MutexLocker locker(m_mutex);
383
384
385
386
387
388
389
390
391
392
    m_stats = stats;
    m_statsSet = true;
  }
  
  /**
   * update the tapeThreadTimer, allowing logging a "best estimate" total time
   * in case of crash. If the provided stats in updateStats has a non-zero total
   * time, we will not use this value.
   * @param tapeThreadTimer:  the start time of the tape thread
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
393
  void updateThreadTimer(const cta::utils::Timer & timer) {
394
    cta::threading::MutexLocker locker(m_mutex);
395
396
397
398
    m_tapeThreadTimer = timer;
  }
  
  /**
399
   * Queue new parameter to be sent asynchronously to the main thread.
400
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
401
  void addParameter (const cta::log::Param & param) {
402
403
404
405
406
407
408
409
    m_toAddParamsQueue.push(param);
  }

 /**
   * Queue the parameter to be deleted asynchronously in the main thread.
   */
  void deleteParameter (const std::string & param) {
    m_toDeleteParamsQueue.push(param);
410
411
  }
  
412
413
414
415
416
417
  /**
   * Add error by name. We will count the errors by name in the watchdog.
   */
  void addToErrorCount (const std::string & errorName) {
    uint32_t count;
    {
418
      cta::threading::MutexLocker locker(m_mutex);
419
420
421
422
423
424
425
426
427
      // There is no default constructor for uint32_t (auto set to zero),
      // so we need to consider 2 cases.
      if(m_errorCounts.end() != m_errorCounts.find(errorName)) {
        count = ++m_errorCounts[errorName];
      } else {
        count = m_errorCounts[errorName] = 1;
      }
    }
    // We ship the new value ASAP to the main thread.
Victor Kotlyar's avatar
Victor Kotlyar committed
428
    addParameter(cta::log::Param(errorName, count));
429
430
  }
  
431
432
433
434
435
436
  /**
   * Set error count. This is used for once per session events that could
   * be detected several times.
   */
  void setErrorCount (const std::string & errorName, uint32_t value) {
    {
437
      cta::threading::MutexLocker locker(m_mutex);
438
439
440
      m_errorCounts[errorName] = value;
    }
    // We ship the new value ASAP to the main thread.
Victor Kotlyar's avatar
Victor Kotlyar committed
441
    addParameter(cta::log::Param(errorName, value));
442
443
  }
  
444
445
446
447
  /**
   * Test whether an error happened
   */
  bool errorHappened() {
448
    cta::threading::MutexLocker locker(m_mutex);
449
450
451
    return m_errorCounts.size();
  }
  
452
453
454
  /**
   * Start the thread
   */
455
456
457
  void startThread(){
    start();
  }
458
459
  
  /**
460
   * Ask to stop the watchdog thread and join it
461
   */
462
  void stopAndWaitThread(){
463
464
465
    m_stopFlag.set();
    wait();
  }
466
  
467
468
469
470
471
472
};

/**
 * Implementation of TaskWatchDog for recalls 
 */
class RecallWatchDog: public TaskWatchDog {
473
  
474
private:
475
  
476
  /** The file we are working on */
477
478
  uint64_t m_fileId;
  uint64_t m_fSeq;
479
  
480
  virtual void logStuckFile() {
Victor Kotlyar's avatar
Victor Kotlyar committed
481
    cta::log::ScopedParamContainer params(m_lc);
482
    params.add("TimeSinceLastBlockMove", m_blockMovementTimer.secs())
483
          .add("fileId", m_fileId)
484
          .add("fileId",m_fileId)
485
          .add("fSeq",m_fSeq);
Victor Kotlyar's avatar
Victor Kotlyar committed
486
    m_lc.log(cta::log::WARNING, "No tape block movement for too long");
487
  }
488
  
489
public:
490
  
491
492
  /** Pass through constructor */
  RecallWatchDog(double periodToReport,double stuckPeriod,
493
    cta::tape::daemon::TapedProxy& initialProcess,
494
    cta::TapeMount & mount,
495
    const std::string & driveUnitName,
Victor Kotlyar's avatar
Victor Kotlyar committed
496
    cta::log::LogContext&  lc, double pollPeriod = 0.1): 
497
  TaskWatchDog(periodToReport, stuckPeriod, initialProcess, mount, driveUnitName, lc, 
498
    pollPeriod) {}
499
  
500
501
502
503
  /**
   * Notify the watchdog which file we are operating
   * @param file
   */
504
  void notifyBeginNewJob(const uint64_t fileId, uint64_t fSeq) {
505
    cta::threading::MutexLocker locker(m_mutex);
506
507
    m_fileId=fileId;
    m_fSeq=fSeq;
508
509
510
511
512
513
514
    m_fileBeingMoved=true;
  }
  
  /**
   * Notify the watchdog  we have finished operating on the current file
   */
  void fileFinished(){
515
    cta::threading::MutexLocker locker(m_mutex);
516
    m_fileBeingMoved=false;
517
518
    m_fileId=0;
    m_fSeq=0;
519
520
521
522
523
524
525
  }
};

/**
 * Implementation of TaskWatchDog for migrations 
 */
class MigrationWatchDog: public TaskWatchDog {
526

527
private:
528
  
529
  /** The file we are working on */
530
531
532
  uint64_t m_fileId;
  uint64_t m_fSeq;
 
533
  virtual void logStuckFile() {
Victor Kotlyar's avatar
Victor Kotlyar committed
534
    cta::log::ScopedParamContainer params(m_lc);
535
    params.add("TimeSinceLastBlockMove", m_blockMovementTimer.secs())
536
          .add("fileId",m_fileId)
537
          .add("fSeq",m_fSeq);
Victor Kotlyar's avatar
Victor Kotlyar committed
538
    m_lc.log(cta::log::WARNING, "No tape block movement for too long");
539
  }
540
  
541
public:
542
  
543
  /** Pass through constructor */
544
  MigrationWatchDog(double periodToReport,double stuckPeriod,
545
    cta::tape::daemon::TapedProxy& initialProcess,
546
    cta::TapeMount & mount,
547
    const std::string & driveUnitName,
Victor Kotlyar's avatar
Victor Kotlyar committed
548
    cta::log::LogContext lc, double pollPeriod = 0.1): 
549
  TaskWatchDog(periodToReport, stuckPeriod, initialProcess, mount, driveUnitName, lc, 
550
    pollPeriod) {}
551
  
552
553
554
555
  /**
   * Notify the watchdog which file we are operating
   * @param file
   */
556
  void notifyBeginNewJob(const uint64_t fileId, uint64_t fSeq){
557
    cta::threading::MutexLocker locker(m_mutex);
558
559
    m_fileId=fileId;
    m_fSeq=fSeq;
560
561
562
563
564
565
566
    m_fileBeingMoved=true;
  }
  
  /**
   * Notify the watchdog  we have finished operating on the current file
   */
  void fileFinished(){
567
    cta::threading::MutexLocker locker(m_mutex);
568
    m_fileBeingMoved=false;
569
570
    m_fileId=0;
    m_fSeq=0;
571
  }
572
};
573

574
}}}}