TaskWatchDog.hpp 18.3 KB
Newer Older
David COME's avatar
David COME committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once 

26

27
28
#include "common/threading/AtomicFlag.hpp"
#include "common/threading/BlockingQueue.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
29
#include "common/log/LogContext.hpp"
30
#include "tapeserver/daemon/TapedProxy.hpp"
31
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
32
#include "common/Timer.hpp"
33
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
34
#include "scheduler/TapeMount.hpp"
35

36
#include <map>
Steven Murray's avatar
Steven Murray committed
37
38
#include <unistd.h>

David COME's avatar
David COME committed
39
40
41
42
43
namespace castor {

namespace tape {
namespace tapeserver {
namespace daemon {
44
/**
45
 * Virtual class for watching tape read or write operation (mostly complete)
46
 */
47
class TaskWatchDog : private cta::threading::Thread{
48
49
50
51
protected:
  /**
   * The mutex protecting updates and the worker thread from each other
   */
52
  cta::threading::Mutex m_mutex;
53
  
54
  /**
55
56
57
   *  Number of blocks we moved since the last update. Has to be atomic because it is 
   *  updated from the outside 
   */
58
  uint64_t m_TapeBytesMovedMoved;
59
  
60
61
62
63
64
65
66
67
68
69
70
71
72
  /**
   * Statistics of the current session
   */
  TapeSessionStats m_stats;
  
  /**
   * Did we set the stats at least once?
   */
  bool m_statsSet;
  
  /**
   * A timer allowing estimation of the total timer for crashing sessions
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
73
  cta::utils::Timer m_tapeThreadTimer;
74
  
75
76
  /**
   *  Timer for regular heartbeat reports to parent process
77
   */  
Victor Kotlyar's avatar
Victor Kotlyar committed
78
  cta::utils::Timer m_reportTimer;
79
80
  
  /*
81
   *  How long since we last logged a warning?
82
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
83
  cta::utils::Timer m_blockMovementReportTimer;
84
  
85
86
  /**
   * how long since the last block movement?
87
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
88
  cta::utils::Timer m_blockMovementTimer;
89
  
90
91
  /**
   * How fast should we tick?
92
   */
93
  const double m_pollPeriod;
94
  
95
96
  /**
   *  How often to we send heartbeat notifications (in second)
97
   */
98
  const double m_reportPeriod; 
99
  
100
101
  /**
   *  How long to we have to wait before saying we are stuck (in second)
102
   */
103
  const double m_stuckPeriod;
104
105
  
  /*
106
   *  Atomic flag to stop the thread's loop
107
   */
108
  cta::threading::AtomicFlag m_stopFlag;
109
110
  
  /*
111
   *  The proxy that will receive or heartbeat notifications
112
   */
113
  cta::tape::daemon::TapedProxy& m_initialProcess;
114
  
115
116
117
118
119
120
  /**
   * Reference to the current mount. Used to report performance statistics to object
   * store.
   */
  cta::TapeMount & m_mount;

121
122
123
124
  /**
   * The drive unit name to report
   */
  const std::string m_driveUnitName;
125
126
127
  /*
   *  Is the system at _this very moment_ reading or writing on tape
   */
128
  bool m_fileBeingMoved;
129
130
131
  /*
   * Logging system  
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
132
  cta::log::LogContext m_lc;
133
134
135
136
137
  
  /*
   * Member function actually logging the file.
   */
  virtual void logStuckFile() = 0;
138
139
140
141
  
  /**
   * One offs parameters to be sent to the initial process
   */
142
143
144
145
146
147
  cta::threading::BlockingQueue<cta::log::Param> m_toAddParamsQueue;

  /**
   * One offs parameters to be deleted from the initial process
   */
  cta::threading::BlockingQueue<std::string> m_toDeleteParamsQueue;
148
149
  
  /**
150
   * Map of all error counts
151
152
   */
  std::map<std::string, uint32_t> m_errorCounts;
153

154
155
156
157
158
159
  /**
   * Send the statistics to the initial process. m_mutex should be taken before
   * calling.
   */
  void reportStats() {
    // Shortcut definitions
Victor Kotlyar's avatar
Victor Kotlyar committed
160
    typedef cta::log::Param Param;
161
162
163
    if (m_statsSet) {
      // Build the statistics to be logged
      std::list<Param> paramList;
164
165
166
167
168
169
      // for delivery time we estimate it ourselves before getting the
      // final stats at the end of the last disk thread. This allows proper
      // estimation of statistics for data transfer sessions that get killed
      // before completion.
      const double deliveryTime = 
        m_stats.deliveryTime?m_stats.deliveryTime:m_tapeThreadTimer.secs();
170
171
172
173
      // total time is a special case. We will estimate it ourselves before
      // getting the final stats (at the end of the thread). This will allow
      // proper estimation of statistics for tape sessions that get killed
      // before completion.
174
175
176
177
      const double totalTime = m_stats.totalTime?m_stats.totalTime:m_tapeThreadTimer.secs();
      /** Time beetwen fineshed tape thread and finished disk threads */
      const double drainingTime = 
        deliveryTime > totalTime?deliveryTime-totalTime: 0.0;
178
179
180
181
182
183
184
      bool wasTapeMounted = true;
      if(m_stats.mountTime == 0.0){
	//Tape was not mounted, we add a message to tell that no physical mount has been
	//triggered
	wasTapeMounted = false;
      }
      paramList.push_back(Param("wasTapeMounted",wasTapeMounted));
185
186
187
188
189
190
191
      paramList.push_back(Param("mountTime", m_stats.mountTime));
      paramList.push_back(Param("positionTime", m_stats.positionTime));
      paramList.push_back(Param("waitInstructionsTime", m_stats.waitInstructionsTime));
      paramList.push_back(Param("waitFreeMemoryTime", m_stats.waitFreeMemoryTime));
      paramList.push_back(Param("waitDataTime", m_stats.waitDataTime));
      paramList.push_back(Param("waitReportingTime", m_stats.waitReportingTime));
      paramList.push_back(Param("checksumingTime", m_stats.checksumingTime));
192
      paramList.push_back(Param("readWriteTime", m_stats.readWriteTime));
193
194
195
      paramList.push_back(Param("flushTime", m_stats.flushTime));
      paramList.push_back(Param("unloadTime", m_stats.unloadTime));
      paramList.push_back(Param("unmountTime", m_stats.unmountTime));
196
      paramList.push_back(Param("encryptionControlTime", m_stats.encryptionControlTime));
197
      paramList.push_back(Param("transferTime", m_stats.transferTime()));
198
      paramList.push_back(Param("totalTime", totalTime));
199
200
      paramList.push_back(Param("deliveryTime", deliveryTime));
      paramList.push_back(Param("drainingTime", drainingTime));
201
202
203
204
205
206
207
208
      paramList.push_back(Param("dataVolume", m_stats.dataVolume));
      paramList.push_back(Param("filesCount", m_stats.filesCount));
      paramList.push_back(Param("headerVolume", m_stats.headerVolume));
      // Compute performance
      paramList.push_back(Param("payloadTransferSpeedMBps", totalTime?1.0*m_stats.dataVolume
                /1000/1000/totalTime:0.0));
      paramList.push_back(Param("driveTransferSpeedMBps", totalTime?1.0*(m_stats.dataVolume+m_stats.headerVolume)
                /1000/1000/totalTime:0.0));
209
210
211
212
213
214
      if(m_mount.getMountType() == cta::common::dataStructures::MountType::Retrieve){
	paramList.push_back(Param("repackFilesCount",m_stats.repackFilesCount));
	paramList.push_back(Param("userFilesCount",m_stats.userFilesCount));
	paramList.push_back(Param("repackBytesCount",m_stats.repackBytesCount));
	paramList.push_back(Param("userBytesCount",m_stats.userBytesCount));
      }
215
216
217
218
219
      // Ship the logs to the initial process
      m_initialProcess.addLogParams(m_driveUnitName, paramList);
    }
  }
  
220
  /**
221
   * Thread's loop
222
   */
223
  void run(){
224
    // Shortcut definitions
Victor Kotlyar's avatar
Victor Kotlyar committed
225
    typedef cta::log::Param Param;
226
227
228
229
    // reset timers as we don't know how long it took before the thread started
    m_reportTimer.reset();
    m_blockMovementReportTimer.reset();
    m_blockMovementTimer.reset();
230
231
    while(!m_stopFlag) {
      
232
      // Critical section block for internal watchdog
233
      {
234
        cta::threading::MutexLocker locker(m_mutex);
David COME's avatar
David COME committed
235
        //useful if we are stuck on a particular file
236
237
238
        if(m_fileBeingMoved && 
           m_blockMovementTimer.secs()>m_stuckPeriod &&
           m_blockMovementReportTimer.secs()>m_stuckPeriod){
239
          // We are stuck while moving a file. We will just log that.
240
241
          logStuckFile();
          m_blockMovementReportTimer.reset();
242
        }
243
      }
244
245
246
247
248
249
      
      // Send any one-off parameter
      {
        std::list<Param> params;
        // This is thread safe because we are the only consumer:
        // a non-zero size guarantees we will find something.
250
251
        while (m_toAddParamsQueue.size())
          params.push_back(m_toAddParamsQueue.pop());
252
253
254
255
        if (params.size()) {
          m_initialProcess.addLogParams(m_driveUnitName, params);
        }
      }
David COME's avatar
David COME committed
256

257
258
259
260
261
262
263
264
265
266
267
268
      // Send any one-off parameter to delete
      {
        std::list<std::string> params;
        // This is thread safe because we are the only consumer:
        // a non-zero size guarantees we will find something.
        while (m_toDeleteParamsQueue.size())
          params.push_back(m_toDeleteParamsQueue.pop());
        if (params.size()) {
          m_initialProcess.deleteLogParams(m_driveUnitName, params);
        }
      }

David COME's avatar
David COME committed
269
      //heartbeat to notify activity to the mother
270
      // and transmit statistics
271
      if(m_reportTimer.secs() > m_reportPeriod){
272
        cta::threading::MutexLocker locker(m_mutex);
Victor Kotlyar's avatar
Victor Kotlyar committed
273
        m_lc.log(cta::log::DEBUG,"going to report");
274
        m_reportTimer.reset();
275
        m_initialProcess.reportHeartbeat(m_TapeBytesMovedMoved, 0);
276
        reportStats(); 
277
278
279
280
281
282
283
284
        try {
          m_mount.setTapeSessionStats(m_stats);
        } catch (cta::exception::Exception & ex) {
          cta::log::ScopedParamContainer params (m_lc);
          params.add("exceptionMessage", ex.getMessageValue());
          m_lc.log(cta::log::WARNING, "In TaskWatchDog::run(): failed to set tape session stats in sched. DB. Skipping.");
        }

285
286
      } 
      else{
287
        usleep(m_pollPeriod*1000*1000);
288
289
      }
    }
290
    // At the end of the run, make sure we push the last stats to the initial
291
    // process and that we process any remaining parameter.
292
    {
293
      cta::threading::MutexLocker locker(m_mutex);
294
      reportStats();
295
296
297
298
299
300
301
      try {
        m_mount.setTapeSessionStats(m_stats);
      } catch (cta::exception::Exception & ex) {
        cta::log::ScopedParamContainer params (m_lc);
        params.add("exceptionMessage", ex.getMessageValue());
        m_lc.log(cta::log::WARNING, "In TaskWatchDog::run(): failed to set tape session stats in sched. DB. Skipping.");
      }
302
303
      // Flush the one-of parameters one last time.
      std::list<Param> params;
304
305
      while (m_toAddParamsQueue.size())
        params.push_back(m_toAddParamsQueue.pop());
306
307
308
      if (params.size()) {
        m_initialProcess.addLogParams(m_driveUnitName, params);
      }
309
310
311
312
313
314
315
316

      std::list<std::string> paramsToDelete;
      // Flush the one-of parameters one last time.
      while (m_toDeleteParamsQueue.size())
        paramsToDelete.push_back(m_toDeleteParamsQueue.pop());
      if (params.size()) {
        m_initialProcess.deleteLogParams(m_driveUnitName, paramsToDelete);
      }
317
    }
318
319
320
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
321
    // To be sure we wait the full half second, we use a timed loop.
Victor Kotlyar's avatar
Victor Kotlyar committed
322
    cta::utils::Timer exitTimer;
323
324
    while (exitTimer.secs() < 0.5)
      usleep(100*1000);
325
  }
326
  
327
  public:
328
329
330
    
  /**
   * Constructor
331
332
   * @param periodToReport How often should we report to the mother (in seconds)
   * @param stuckPeriod  How long do we wait before we say we are stuck on file (in seconds)
333
334
335
336
   * @param initialProcess The proxy we use for sending heartbeat 
   * @param reportPacker 
   * @param lc To log the events
   */
337
  TaskWatchDog(double reportPeriod,double stuckPeriod,
338
         cta::tape::daemon::TapedProxy& initialProcess,
339
340
         cta::TapeMount & mount,
         const std::string & driveUnitName,
Victor Kotlyar's avatar
Victor Kotlyar committed
341
         cta::log::LogContext&  lc, double pollPeriod = 0.1):
342
  m_TapeBytesMovedMoved(0), m_statsSet(false), m_pollPeriod(pollPeriod),
343
  m_reportPeriod(reportPeriod), m_stuckPeriod(stuckPeriod), 
344
  m_initialProcess(initialProcess), m_mount(mount), m_driveUnitName(driveUnitName),
345
  m_fileBeingMoved(false), m_lc(lc) {
Victor Kotlyar's avatar
Victor Kotlyar committed
346
    m_lc.pushOrReplace(cta::log::Param("thread","Watchdog"));
347
  }
348
349
  
  /**
350
   * notify the watchdog a mem block has been moved
351
   */
352
  void notify(uint64_t movedBytes){
353
    cta::threading::MutexLocker locker(m_mutex);
354
    m_blockMovementTimer.reset();
355
    m_TapeBytesMovedMoved+=movedBytes;
356
  }
357
358
359
360
361
362
363
364
365
 
  /**
   * update the watchdog's statistics for the session delivery time
   * @param a new deliveryTime
   */
  void updateStatsDeliveryTime (const double deliveryTime) {
    cta::threading::MutexLocker locker(m_mutex);
    m_stats.deliveryTime = deliveryTime;
  }
366
  
367
368
369
370
371
372
373
374
375
376
377
378
  /**
   * update the watchdog's statistics for the session
   * @param stats the stats counters collection to push
   */
  void updateStatsWithoutDeliveryTime (const TapeSessionStats & stats) {
    cta::threading::MutexLocker locker(m_mutex);
    const double savedDeliveryTime = m_stats.deliveryTime;
    m_stats = stats;
    m_stats.deliveryTime = savedDeliveryTime;
    m_statsSet = true;
  }

379
380
381
382
383
  /**
   * update the watchdog's statistics for the session
   * @param stats the stats counters collection to push
   */
  void updateStats (const TapeSessionStats & stats) {
384
    cta::threading::MutexLocker locker(m_mutex);
385
386
387
388
389
390
391
392
393
394
    m_stats = stats;
    m_statsSet = true;
  }
  
  /**
   * update the tapeThreadTimer, allowing logging a "best estimate" total time
   * in case of crash. If the provided stats in updateStats has a non-zero total
   * time, we will not use this value.
   * @param tapeThreadTimer:  the start time of the tape thread
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
395
  void updateThreadTimer(const cta::utils::Timer & timer) {
396
    cta::threading::MutexLocker locker(m_mutex);
397
398
399
400
    m_tapeThreadTimer = timer;
  }
  
  /**
401
   * Queue new parameter to be sent asynchronously to the main thread.
402
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
403
  void addParameter (const cta::log::Param & param) {
404
405
406
407
408
409
410
411
    m_toAddParamsQueue.push(param);
  }

 /**
   * Queue the parameter to be deleted asynchronously in the main thread.
   */
  void deleteParameter (const std::string & param) {
    m_toDeleteParamsQueue.push(param);
412
413
  }
  
414
415
416
417
418
419
  /**
   * Add error by name. We will count the errors by name in the watchdog.
   */
  void addToErrorCount (const std::string & errorName) {
    uint32_t count;
    {
420
      cta::threading::MutexLocker locker(m_mutex);
421
422
423
424
425
426
427
428
429
      // There is no default constructor for uint32_t (auto set to zero),
      // so we need to consider 2 cases.
      if(m_errorCounts.end() != m_errorCounts.find(errorName)) {
        count = ++m_errorCounts[errorName];
      } else {
        count = m_errorCounts[errorName] = 1;
      }
    }
    // We ship the new value ASAP to the main thread.
Victor Kotlyar's avatar
Victor Kotlyar committed
430
    addParameter(cta::log::Param(errorName, count));
431
432
  }
  
433
434
435
436
437
438
  /**
   * Set error count. This is used for once per session events that could
   * be detected several times.
   */
  void setErrorCount (const std::string & errorName, uint32_t value) {
    {
439
      cta::threading::MutexLocker locker(m_mutex);
440
441
442
      m_errorCounts[errorName] = value;
    }
    // We ship the new value ASAP to the main thread.
Victor Kotlyar's avatar
Victor Kotlyar committed
443
    addParameter(cta::log::Param(errorName, value));
444
445
  }
  
446
447
448
449
  /**
   * Test whether an error happened
   */
  bool errorHappened() {
450
    cta::threading::MutexLocker locker(m_mutex);
451
452
453
    return m_errorCounts.size();
  }
  
454
455
456
  /**
   * Start the thread
   */
457
458
459
  void startThread(){
    start();
  }
460
461
  
  /**
462
   * Ask to stop the watchdog thread and join it
463
   */
464
  void stopAndWaitThread(){
465
466
467
    m_stopFlag.set();
    wait();
  }
468
  
469
470
471
472
473
474
};

/**
 * Implementation of TaskWatchDog for recalls 
 */
class RecallWatchDog: public TaskWatchDog {
475
  
476
private:
477
  
478
  /** The file we are working on */
479
480
  uint64_t m_fileId;
  uint64_t m_fSeq;
481
  
482
  virtual void logStuckFile() {
Victor Kotlyar's avatar
Victor Kotlyar committed
483
    cta::log::ScopedParamContainer params(m_lc);
484
    params.add("TimeSinceLastBlockMove", m_blockMovementTimer.secs())
485
          .add("fileId", m_fileId)
486
          .add("fileId",m_fileId)
487
          .add("fSeq",m_fSeq);
Victor Kotlyar's avatar
Victor Kotlyar committed
488
    m_lc.log(cta::log::WARNING, "No tape block movement for too long");
489
  }
490
  
491
public:
492
  
493
494
  /** Pass through constructor */
  RecallWatchDog(double periodToReport,double stuckPeriod,
495
    cta::tape::daemon::TapedProxy& initialProcess,
496
    cta::TapeMount & mount,
497
    const std::string & driveUnitName,
Victor Kotlyar's avatar
Victor Kotlyar committed
498
    cta::log::LogContext&  lc, double pollPeriod = 0.1): 
499
  TaskWatchDog(periodToReport, stuckPeriod, initialProcess, mount, driveUnitName, lc, 
500
    pollPeriod) {}
501
  
502
503
504
505
  /**
   * Notify the watchdog which file we are operating
   * @param file
   */
506
  void notifyBeginNewJob(const uint64_t fileId, uint64_t fSeq) {
507
    cta::threading::MutexLocker locker(m_mutex);
508
509
    m_fileId=fileId;
    m_fSeq=fSeq;
510
511
512
513
514
515
516
    m_fileBeingMoved=true;
  }
  
  /**
   * Notify the watchdog  we have finished operating on the current file
   */
  void fileFinished(){
517
    cta::threading::MutexLocker locker(m_mutex);
518
    m_fileBeingMoved=false;
519
520
    m_fileId=0;
    m_fSeq=0;
521
522
523
524
525
526
527
  }
};

/**
 * Implementation of TaskWatchDog for migrations 
 */
class MigrationWatchDog: public TaskWatchDog {
528

529
private:
530
  
531
  /** The file we are working on */
532
533
534
  uint64_t m_fileId;
  uint64_t m_fSeq;
 
535
  virtual void logStuckFile() {
Victor Kotlyar's avatar
Victor Kotlyar committed
536
    cta::log::ScopedParamContainer params(m_lc);
537
    params.add("TimeSinceLastBlockMove", m_blockMovementTimer.secs())
538
          .add("fileId",m_fileId)
539
          .add("fSeq",m_fSeq);
Victor Kotlyar's avatar
Victor Kotlyar committed
540
    m_lc.log(cta::log::WARNING, "No tape block movement for too long");
541
  }
542
  
543
public:
544
  
545
  /** Pass through constructor */
546
  MigrationWatchDog(double periodToReport,double stuckPeriod,
547
    cta::tape::daemon::TapedProxy& initialProcess,
548
    cta::TapeMount & mount,
549
    const std::string & driveUnitName,
Victor Kotlyar's avatar
Victor Kotlyar committed
550
    cta::log::LogContext lc, double pollPeriod = 0.1): 
551
  TaskWatchDog(periodToReport, stuckPeriod, initialProcess, mount, driveUnitName, lc, 
552
    pollPeriod) {}
553
  
554
555
556
557
  /**
   * Notify the watchdog which file we are operating
   * @param file
   */
558
  void notifyBeginNewJob(const uint64_t fileId, uint64_t fSeq){
559
    cta::threading::MutexLocker locker(m_mutex);
560
561
    m_fileId=fileId;
    m_fSeq=fSeq;
562
563
564
565
566
567
568
    m_fileBeingMoved=true;
  }
  
  /**
   * Notify the watchdog  we have finished operating on the current file
   */
  void fileFinished(){
569
    cta::threading::MutexLocker locker(m_mutex);
570
    m_fileBeingMoved=false;
571
572
    m_fileId=0;
    m_fSeq=0;
573
  }
574
};
575

576
}}}}