TapeWriteSingleThread.hpp 11.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/******************************************************************************
 *                      TapeWriteSingleThread.hpp
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

27
#include "common/processCap/ProcessCap.hpp"
28
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
29
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
30
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
31
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
32
#include "castor/tape/tapeserver/daemon/TapeServerReporter.hpp"
33
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
34
35
#include "castor/server/BlockingQueue.hpp"
#include "castor/server/Threading.hpp"
36
#include "castor/tape/tapeserver/file/File.hpp"
37
38
#include "castor/utils/Timer.hpp"

39
40
#include <iostream>
#include <stdio.h>
41

42
43
namespace castor     {
namespace tape       {
David COME's avatar
David COME committed
44
namespace tapeserver {
45
namespace daemon     {
46
47
48
// forward definition
class MigrationTaskInjector;
  
49
class TapeWriteSingleThread : public TapeSingleThreadInterface<TapeWriteTask> {
50
public:
51
52
53
54
55
56
57
58
59
60
61
  /**
   * Constructor
   * @param drive an interface for manipulating the drive in order 
   * to write on the tape 
   * @param vid the volume ID of the tape on which we are going to write
   * @param lc 
   * @param repPacker the object that will send reports to the client
   * @param filesBeforeFlush  how many file written before flushing on tape
   * @param bytesBeforeFlush how many bytes written before flushing on tape
   * @param lastFseq the last fSeq 
   */
62
  TapeWriteSingleThread(
63
    castor::tape::tapeserver::drive::DriveInterface & drive, 
64
    mediachanger::MediaChangerFacade &mc,
65
    TapeServerReporter & tsr,
66
    MigrationWatchDog & mwd,
67
    const VolumeInfo& volInfo,
Daniele Kruse's avatar
Daniele Kruse committed
68
69
    castor::log::LogContext & lc,
    MigrationReportPacker & repPacker,
70
    cta::server::ProcessCap &capUtils,
71
72
    uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush, const bool useLbp);
  
73
74
75
76
  /**
   * 
   * @param lastFseq
   */
77
  void setlastFseq(uint64_t lastFseq);
78
79
80
81
82
83
84
85
86
87
88
89
90
  
  /**
   * Sets up the pointer to the task injector. This cannot be done at
   * construction time as both task injector and tape write single thread refer to 
   * each other. This function should be called before starting the threads.
   * This is used for signalling problems during mounting. After that, each 
   * tape write task does the signalling itself, either on tape problem, or
   * when receiving an error from the disk tasks via memory blocks.
   * @param injector pointer to the task injector
   */
  void setTaskInjector(MigrationTaskInjector* injector){
      m_injector = injector;
  }
91
92
93
94
95
96
97
98
private:  
  
  /**
   * Returns the string representation of the specified mount type
   */
  const char *mountTypeToString(const cta::MountType::Enum mountType) const
    throw();
  
99
100
    class TapeCleaning{
    TapeWriteSingleThread& m_this;
Eric Cano's avatar
WIP    
Eric Cano committed
101
    // As we are living in the single thread of tape, we can borrow the timer
102
    castor::utils::Timer & m_timer;
103
  public:
104
    TapeCleaning(TapeWriteSingleThread& parent, castor::utils::Timer & timer):
105
      m_this(parent), m_timer(timer) {}
106
    ~TapeCleaning(){
107
      m_this.m_reportPacker.reportDriveStatus(cta::common::DriveStatus::CleaningUp);
108
109
110
111
      // This out-of-try-catch variables allows us to record the stage of the 
      // process we're in, and to count the error if it occurs.
      // We will not record errors for an empty string. This will allow us to
      // prevent counting where error happened upstream.
112
113
      // Log (safely, exception-wise) the tape alerts (if any) at the end of the session
      try { m_this.logTapeAlerts(); } catch (...) {}
114
      std::string currentErrorToCount = "Error_tapeUnload";
115
      try{
Eric Cano's avatar
WIP    
Eric Cano committed
116
        // Do the final cleanup
117
118
119
120
        // First check that a tape is actually present in the drive. We can get here
        // after failing to mount (library error) in which case there is nothing to
        // do (and trying to unmount will only lead to a failure.)
        const uint32_t waitMediaInDriveTimeout = 60;
121
122
        try {
          m_this.m_drive.waitUntilReady(waitMediaInDriveTimeout);
123
        } catch (cta::exception::Exception &) {}
124
125
126
127
        if (!m_this.m_drive.hasTapeInPlace()) {
          m_this.m_logContext.log(LOG_INFO, "TapeReadSingleThread: No tape to unload");
          goto done;
        }
Eric Cano's avatar
WIP    
Eric Cano committed
128
        // in the special case of a "manual" mode tape, we should skip the unload too.
129
        if (mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType()) {
130
          m_this.m_reportPacker.reportDriveStatus(cta::common::DriveStatus::Unloading);
Eric Cano's avatar
WIP    
Eric Cano committed
131
132
133
134
135
          m_this.m_drive.unloadTape();
          m_this.m_logContext.log(LOG_INFO, "TapeWriteSingleThread: Tape unloaded");
        } else {
          m_this.m_logContext.log(LOG_INFO, "TapeWriteSingleThread: Tape NOT unloaded (manual mode)");
        }
136
        m_this.m_stats.unloadTime += m_timer.secs(castor::utils::Timer::resetCounter);
Eric Cano's avatar
WIP    
Eric Cano committed
137
138
139
        // And return the tape to the library
        // In case of manual mode, this will be filtered by the rmc daemon
        // (which will do nothing)
140
        currentErrorToCount = "Error_tapeDismount";
141
        m_this.m_reportPacker.reportDriveStatus(cta::common::DriveStatus::Unmounting);
142
        m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.getLibrarySlot());
143
        m_this.m_drive.disableLogicalBlockProtection();
144
        m_this.m_reportPacker.reportDriveStatus(cta::common::DriveStatus::Up);
145
        m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter);
146
        m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType() ?
Eric Cano's avatar
WIP    
Eric Cano committed
147
          "TapeWriteSingleThread : tape unmounted":"TapeWriteSingleThread : tape NOT unmounted (manual mode)");
148
149
        m_this.m_initialProcess.reportState(cta::tape::session::SessionState::Shutdown,
          cta::tape::session::SessionType::Archive);
150
        m_this.m_stats.waitReportingTime += m_timer.secs(castor::utils::Timer::resetCounter);
151
      }
152
      catch(const cta::exception::Exception& ex){
153
154
        // Notify something failed during the cleaning 
        m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
155
        m_this.m_reportPacker.reportDriveStatus(cta::common::DriveStatus::Down);
156
        castor::log::ScopedParamContainer scoped(m_this.m_logContext);
157
        scoped.add("exception_message", ex.getMessageValue());
158
        m_this.m_logContext.log(LOG_ERR, "Exception in TapeWriteSingleThread-TapeCleaning");
159
160
161
162
163
164
165
        // As we do not throw exceptions from here, the watchdog signalling has
        // to occur from here.
        try {
          if (currentErrorToCount.size()) {
            m_this.m_watchdog.addToErrorCount(currentErrorToCount);
          }
        } catch (...) {}
166
      } catch (...) {
167
168
          // Notify something failed during the cleaning 
          m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
169
          m_this.m_reportPacker.reportDriveStatus(cta::common::DriveStatus::Down);
170
          m_this.m_logContext.log(LOG_ERR, "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape");
171
172
173
174
175
          try {
          if (currentErrorToCount.size()) {
            m_this.m_watchdog.addToErrorCount(currentErrorToCount);
          }
        } catch (...) {}
176
      }
177
      done:
178
      //then we terminate the global status reporter
179
      m_this.m_initialProcess.finish();
180
181
    }
  };
182
183
184
185
186
  /**
   * Will throw an exception if we cant write on the tape
   */
  void isTapeWritable() const;

187
188
189
  /**
   * Log  m_stats  parameters into m_logContext with msg at the given level
   */
190
  void logWithStats(int level,const std::string& msg,
191
    log::ScopedParamContainer& params);
192
  
193
194
  /**
   * Function to open the WriteSession 
195
196
   * If successful, returns a std::unique_ptr on it. A copy of that std::unique_ptr
   * will give the caller the ownership of the opened session (see unique_ptr 
197
   * copy constructor, which has a move semantic)
198
   * @return the WriteSession we need to write on tape
199
   */
200
  std::unique_ptr<castor::tape::tapeFile::WriteSession> openWriteSession();
201
  /**
202
   * Execute flush on tape, do some log and report the flush to the client
203
204
205
206
207
208
   * @param message the message the log will register
   * @param bytes the number of bytes that have been written since the last flush  
   * (for logging)
   * @param files the number of files that have been written since the last flush  
   * (also for logging)
   */
Eric Cano's avatar
WIP    
Eric Cano committed
209
  void tapeFlush(const std::string& message,uint64_t bytes,uint64_t files,
210
    castor::utils::Timer & timer);
211
  
212

213
  virtual void run() ;
214
  
215
216
217
218
  //m_filesBeforeFlush and m_bytesBeforeFlush are thresholds for flushing 
  //the first one crossed will trigger the flush on tape
  
  ///how many file written before flushing on tape
219
  const uint64_t m_filesBeforeFlush;
220
221
  
  ///how many bytes written before flushing on tape
222
  const uint64_t m_bytesBeforeFlush;
223

224
  ///an interface for manipulating all type of drives
225
  castor::tape::tapeserver::drive::DriveInterface& m_drive;
226
227
  
  ///the object that will send reports to the client
228
  MigrationReportPacker & m_reportPacker;
229
  
230
231
232
233
234
  /**
   * the last fseq that has been written on the tape = the starting point 
   * of our session. The last Fseq is computed by subtracting 1 to fSeg
   * of the first file to migrate we receive. That part is done by the 
   * MigrationTaskInjector.::synchronousInjection. Thus, we compute it into 
235
   * that function and retrieve/set it within DataTransferSession executeWrite
236
237
238
239
240
241
242
243
244
245
   * after we make sure synchronousInjection returned true. 
   * 
   * It should be const, but it cant 
   * (because there is no mutable function member in c++)
   */
   uint64_t m_lastFseq;

  /**
   * Should the compression be enabled ? This is currently hard coded to true 
   */
246
  const bool m_compress;
247
  
248
249
250
251
252
253
  /**
   * The boolean variable describing to use on not to use Logical
   * Block Protection.
   */
  const bool m_useLbp;
  
254
255
256
257
  /**
   * Reference to the watchdog, used in run()
   */
  MigrationWatchDog & m_watchdog;
258
  
259
260
261
262
263
264
265
266
267
protected:
  /***
   * Helper virtual function to access the watchdog from parent class
   */
  virtual void countTapeLogError(const std::string & error) { 
    m_watchdog.addToErrorCount(error);
  }
  
private:
268
269
270
271
  /**
   *  Pointer to the task injector allowing termination signaling 
   */
  MigrationTaskInjector* m_injector;
272
273
274
275
276
277
278

}; // class TapeWriteSingleThread

} // namespace daemon
} // namespace tapeserver
} // namsepace tape
} // namespace castor