TapeWriteSingleThread.hpp 10.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/******************************************************************************
 *                      TapeWriteSingleThread.hpp
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

27
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
28
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
29
#include "castor/tape/tapeserver/threading/Threading.hpp"
30
31
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include "castor/tape/tapeserver/drive/Drive.hpp"
32
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
33
#include "castor/legacymsg/RmcProxy.hpp"
34
#include "castor/tape/tapeserver/daemon/GlobalStatusReporter.hpp"
35
#include "castor/tape/tapeserver/daemon/CapabilityUtils.hpp"
36
#include "castor/tape/utils/Timer.hpp"
37
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
38
39
#include <iostream>
#include <stdio.h>
40

David COME's avatar
David COME committed
41
42
43
44
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
45
46
  
// forward declaration
47
class TapeServerReporter;
David COME's avatar
David COME committed
48

49
class TapeWriteSingleThread : public TapeSingleThreadInterface<TapeWriteTask> {
50
public:
51
52
53
54
55
56
57
58
59
60
61
  /**
   * Constructor
   * @param drive an interface for manipulating the drive in order 
   * to write on the tape 
   * @param vid the volume ID of the tape on which we are going to write
   * @param lc 
   * @param repPacker the object that will send reports to the client
   * @param filesBeforeFlush  how many file written before flushing on tape
   * @param bytesBeforeFlush how many bytes written before flushing on tape
   * @param lastFseq the last fSeq 
   */
62
  TapeWriteSingleThread(castor::tape::drives::DriveInterface & drive, 
63
          castor::legacymsg::RmcProxy & rmc,
64
          TapeServerReporter & gsr,
65
          const client::ClientInterface::VolumeInfo& volInfo,
66
          castor::log::LogContext & lc, MigrationReportPacker & repPacker,
67
           CapabilityUtils &capUtils,
68
	  uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush): 
69
  TapeSingleThreadInterface<TapeWriteTask>(drive, rmc, gsr, volInfo,capUtils, lc),
70
71
          m_filesBeforeFlush(filesBeforeFlush),
          m_bytesBeforeFlush(bytesBeforeFlush),
72
          m_drive(drive), m_reportPacker(repPacker),
73
74
          m_lastFseq(-1),
          m_compress(true) {}
75
    
76
77
78
79
80
81
82
  /**
   * 
   * @param lastFseq
   */
  void setlastFseq(uint64_t lastFseq){
    m_lastFseq=lastFseq;
  }
83
private:
84
85
86
87
88
89
90
91
92
93
94
95
96
    class TapeCleaning{
    TapeWriteSingleThread& m_this;
  public:
    TapeCleaning(TapeWriteSingleThread& parent):m_this(parent){}
    ~TapeCleaning(){
      try{
      // Do the final cleanup
      m_this.m_drive.unloadTape();
      m_this.m_logContext.log(LOG_INFO, "TapeWriteSingleThread : Tape unloaded");
      // And return the tape to the library
      m_this.m_rmc.unmountTape(m_this.m_volInfo.vid, m_this.m_drive.librarySlot);
      m_this.m_logContext.log(LOG_INFO, "TapeWriteSingleThread : tape unmounted");
      m_this.m_gsr.tapeUnmounted();
97
              
98
99
      }
      catch(const castor::exception::Exception& ex){
100
101
        //set it to -1 to notify something failed during the cleaning 
        m_this.m_hardarwareStatus = -1;
102
103
104
105
        castor::log::ScopedParamContainer scoped(m_this.m_logContext);
        scoped.add("exception_message", ex.getMessageValue())
        .add("exception_code",ex.code());
        m_this.m_logContext.log(LOG_ERR, "Exception in TapeWriteSingleThread-TapeCleaning");
106
107
108
109
      } catch (...) {
          //set it to -1 to notify something failed during the cleaning 
          m_this.m_hardarwareStatus = -1;
          m_this.m_logContext.log(LOG_ERR, "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape");
110
      }
111
112
113
      
      //then we terminate the global status reporter
      m_this.m_gsr.finish();
114
115
116
    }
  };
  
117
118
119
120
121
  /**
   * Function to open the WriteSession 
   * If successful, returns a std::auto_ptr on it. A copy of that std::auto_ptr
   * will give the caller the ownership of the opened session (see auto_ptr 
   * copy constructor, which has a move semantic)
122
   * @return the WriteSession we need to write on tape
123
124
125
126
127
128
   */
  std::auto_ptr<castor::tape::tapeFile::WriteSession> openWriteSession() {
    using castor::log::LogContext;
    using castor::log::Param;
    typedef LogContext::ScopedParam ScopedParam;
    
129
    std::auto_ptr<castor::tape::tapeFile::WriteSession> writeSession;
130
131
  
    ScopedParam sp[]={
Eric Cano's avatar
Eric Cano committed
132
133
134
        ScopedParam(m_logContext, Param("vid",m_vid)),
        ScopedParam(m_logContext, Param("lastFseq", m_lastFseq)),
        ScopedParam(m_logContext, Param("compression", m_compress)) 
135
136
137
      };
    tape::utils::suppresUnusedVariable(sp);
      try {
138
       writeSession.reset(
139
         new castor::tape::tapeFile::WriteSession(m_drive,m_volInfo,m_lastFseq,m_compress)
140
       );
Eric Cano's avatar
Eric Cano committed
141
        m_logContext.log(LOG_INFO, "Tape Write session session successfully started");
142
      }
David COME's avatar
David COME committed
143
144
145
146
      catch (castor::exception::Exception & e) {
        ScopedParam sp0(m_logContext, Param("ErrorMessage", e.getMessageValue()));
        ScopedParam sp1(m_logContext, Param("ErrorCode", e.code()));
        m_logContext.log(LOG_ERR, "Failed to start tape write session");
147
148
149
150
        // TODO: log and unroll the session
        // TODO: add an unroll mode to the tape read task. (Similar to exec, but pushing blocks marked in error)
        throw;
      }
151
    return writeSession;
152
  }
153
  /**
154
   * Execute flush on tape, do some log and report the flush to the client
155
156
157
158
159
160
161
   * @param message the message the log will register
   * @param bytes the number of bytes that have been written since the last flush  
   * (for logging)
   * @param files the number of files that have been written since the last flush  
   * (also for logging)
   */
  void tapeFlush(const std::string& message,uint64_t bytes,uint64_t files){
162
    m_drive.flush();
Eric Cano's avatar
Eric Cano committed
163
    log::LogContext::ScopedParam sp0(m_logContext, log::Param("files", files));
164
    log::LogContext::ScopedParam sp1(m_logContext, log::Param("bytes", bytes));
Eric Cano's avatar
Eric Cano committed
165
    m_logContext.log(LOG_INFO,message);
166
167
    m_reportPacker.reportFlush();
  }
168
  
169

170
  virtual void run() {
171

172
173
    try
    {
174
      m_logContext.pushOrReplace(log::Param("thread", "TapeWrite"));
175
      setCapabilities();
176
177
      
      TapeCleaning cleaner(*this);
178
      mountTape(castor::legacymsg::RmcProxy::MOUNT_MODE_READWRITE);
179
      waitForDrive();
180
      // Then we have to initialise the tape write session
181
      std::auto_ptr<castor::tape::tapeFile::WriteSession> writeSession(openWriteSession());
182
183
184
185
      
      //log and notify
      m_logContext.log(LOG_INFO, "Starting tape write thread");
      m_gsr.tapeMountedForWrite();
186
      uint64_t bytes=0;
187
      uint64_t files=0;
188
      std::auto_ptr<TapeWriteTask> task;  
189
      
190
      tape::utils::Timer timer;
191
192
      TaskWatchDog watchdog(m_logContext);
      
193
      while(1) {
194
195
        
        //get a task
196
197
        task.reset(m_tasks.pop());
        
198
199
200
        //if is the end
        if(NULL==task.get()) {      
          //we flush without asking
201
          tapeFlush("No more data to write on tape, unconditional flushing to the client",bytes,files);
202
          //end of session + log 
203
          m_reportPacker.reportEndOfSession();
204
205
          log::LogContext::ScopedParam sp0(m_logContext, log::Param("time taken", timer.secs()));
          m_logContext.log(LOG_DEBUG, "writing data to tape has finished");
206
          break;
207
        }
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
        
        task->execute(*writeSession,m_reportPacker,m_logContext);

        //raise counters
        files++;
        bytes+=task->fileSize();
          
        //if one counter is above a threshold, then we flush
        if (files >= m_filesBeforeFlush || bytes >= m_bytesBeforeFlush) {
          tapeFlush("Normal flush because thresholds was reached",bytes,files);
          files=0;
          bytes=0;
        }
      } //end of while(1))
    } //end of try
223
    catch(const castor::exception::Exception& e){
224
225
226
227
228
229
230
231
232
233
234
      //we end there because write session could not be opened or because a task failed

      //first empty all the tasks and circulate mem blocks
      while(1) {
        std::auto_ptr<TapeWriteTask>  task(m_tasks.pop());
        if(task.get()==NULL) {
          break;
        }
        task->circulateMemBlocks(m_logContext);
      }
      //then log
235
      log::LogContext::ScopedParam sp1(m_logContext, log::Param("exceptionCode", e.code()));
Eric Cano's avatar
Eric Cano committed
236
      log::LogContext::ScopedParam sp2(m_logContext, log::Param("error_MessageValue", e.getMessageValue()));
237
      m_logContext.log(LOG_ERR,"An error occurred during TapwWriteSingleThread::execute");
238
      
239
240
      m_reportPacker.reportEndOfSessionWithErrors(e.what(),e.code());
    }
241
    
242
  }
243
  
244
245
246
247
  //m_filesBeforeFlush and m_bytesBeforeFlush are thresholds for flushing 
  //the first one crossed will trigger the flush on tape
  
  ///how many file written before flushing on tape
248
  const uint64_t m_filesBeforeFlush;
249
250
  
  ///how many bytes written before flushing on tape
251
  const uint64_t m_bytesBeforeFlush;
252

253
  ///an interface for manipulating all type of drives
David COME's avatar
David COME committed
254
  castor::tape::drives::DriveInterface& m_drive;
255
256
  
  ///the object that will send reports to the client
257
  MigrationReportPacker & m_reportPacker;
258
  
259
260
261
262
263
  /**
   * the last fseq that has been written on the tape = the starting point 
   * of our session. The last Fseq is computed by subtracting 1 to fSeg
   * of the first file to migrate we receive. That part is done by the 
   * MigrationTaskInjector.::synchronousInjection. Thus, we compute it into 
264
   * that function and retrieve/set it within DataTransferSession executeWrite
265
266
267
268
269
270
271
272
273
274
   * after we make sure synchronousInjection returned true. 
   * 
   * It should be const, but it cant 
   * (because there is no mutable function member in c++)
   */
   uint64_t m_lastFseq;

  /**
   * Should the compression be enabled ? This is currently hard coded to true 
   */
275
  const bool m_compress;
276
};
David COME's avatar
David COME committed
277
}}}}