DiskReadTask.hpp 3.67 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

David COME's avatar
David COME committed
26
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
27
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
28
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
29
#include "castor/tape/tapeserver/daemon/ErrorFlag.hpp"
30
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
31
#include "common/threading/AtomicFlag.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
32
#include "common/log/LogContext.hpp"
33
#include "castor/tape/tapeserver/file/DiskFile.hpp"
34

David COME's avatar
David COME committed
35
36
37
38
39
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
  
40
class DiskReadTask {
41
public:
42
43
44
45
46
  /**
   * @param destination The task that will consume data block we fill up
   * @param file the file we are migrating. We acquire the ownership of the pointer
   * @param numberOfBlock number of memory block we need read the whole file
   */
47
  DiskReadTask(DataConsumer & destination, 
48
          cta::ArchiveJob *archiveJob,size_t numberOfBlock,
49
          cta::threading::AtomicFlag& errorFlag);
50
  
Victor Kotlyar's avatar
Victor Kotlyar committed
51
  void execute(cta::log::LogContext&  lc, diskFile::DiskFileFactory & fileFactory,
52
53
    MigrationWatchDog & watchdog, const int threadID);
  /**
54
55
56
57
58
   * Return the stats of the tasks. Should be call after execute 
   * (otherwise, it is pointless)
   * @return 
   */
  const DiskStats getTaskStats() const;
59
private:
60
61
62
63
64
65
66
67
68
  
  /**
   * Stats to measue how long it takes to write on disk
   */
  DiskStats m_stats;
  
  /**
   * Throws an exception if m_errorFlag is set
   */
69
  void checkMigrationFailing() const {
70
71
    //if a task has signaled an error, we stop our job
    if(m_errorFlag){
72
      throw  castor::tape::tapeserver::daemon::ErrorFlag();
73
74
    }
  }
75
  
76
  /**
77
78
79
80
81
   * log into lc all m_stats parameters with the given message at the 
   * given level
   * @param level
   * @param message
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
82
  void logWithStat(int level,const std::string& msg, cta::log::LogContext&  lc) ;
83
  
84
85
86
87
88
  /**
   * Circulate the remaining free blocks after an error
   * @param fromBlockId the number of already processed
   * @param mb pointer to a possible already popped free block (NULL otherwise)
   */
89
  void circulateAllBlocks(size_t fromBlockId, MemBlock * mb);
90
91
92
  /**
   * The task (a TapeWriteTask) that will handle the read blocks
   */
93
  DataConsumer & m_nextTask;
94
95
96
97
  
  /**
   * All we need to know about the file we are migrating
   */
98
  cta::ArchiveJob *m_archiveJob;
99
  
100
101
102
103
104
105
106
107
108
109
  /**
   * Information about the archive job we will cache as it is needed
   * after the archive job is going to be potentially deleted by the 
   * writer
   */
  struct {
    std::string remotePath;
    uint64_t fileId;
  } m_archiveJobCachedInfo;
  
110
111
112
  /**
   * The number of memory block we will need to read the whole file
   */
113
  size_t m_numberOfBlock;
114
  
115
  cta::threading::AtomicFlag& m_errorFlag;
116
};
David COME's avatar
David COME committed
117
118

}}}}
119