TapeWriteSingleThread.cpp 28.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
25
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
26

David COME's avatar
David COME committed
27
28
29
//------------------------------------------------------------------------------
//constructor
//------------------------------------------------------------------------------
30
castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeWriteSingleThread(
31
castor::tape::tapeserver::drive::DriveInterface & drive, 
32
        cta::mediachanger::MediaChangerFacade & mc,
33
        TapeServerReporter & tsr,
34
        MigrationWatchDog & mwd,
35
        const VolumeInfo& volInfo,
Victor Kotlyar's avatar
Victor Kotlyar committed
36
        cta::log::LogContext & lc,
Daniele Kruse's avatar
Daniele Kruse committed
37
        MigrationReportPacker & repPacker,
38
        cta::server::ProcessCap &capUtils,
39
        uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush,
40
41
        const bool useLbp, const std::string & externalEncryptionKeyScript,
        const cta::ArchiveMount & archiveMount):
42
43
        TapeSingleThreadInterface<TapeWriteTask>(drive, mc, tsr, volInfo, 
          capUtils, lc, externalEncryptionKeyScript),
44
45
        m_filesBeforeFlush(filesBeforeFlush),
        m_bytesBeforeFlush(bytesBeforeFlush),
Daniele Kruse's avatar
Daniele Kruse committed
46
47
        m_drive(drive),
        m_reportPacker(repPacker),
48
        m_lastFseq(-1),
49
        m_compress(true),
50
        m_useLbp(useLbp),
51
52
        m_watchdog(mwd),
        m_archiveMount(archiveMount){}
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

//------------------------------------------------------------------------------
//TapeCleaning::~TapeCleaning()
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeCleaning(){
  // Disable encryption (or at least try)
  try {
    if (m_this.m_encryptionControl.disable(m_this.m_drive))
      m_this.m_logContext.log(cta::log::INFO, "Turned encryption off before unmounting");
  } catch (cta::exception::Exception & ex) {
    cta::log::ScopedParamContainer scoped(m_this.m_logContext);
    scoped.add("exceptionError", ex.getMessageValue());
    m_this.m_logContext.log(cta::log::ERR, "Failed to turn off encryption before unmounting");
  }
  m_this.m_stats.encryptionControlTime += m_timer.secs(cta::utils::Timer::resetCounter);
68
  m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::CleaningUp, m_this.m_logContext);
69
70
71
72
73
74
75
76
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
  // Log (safely, exception-wise) the tape alerts (if any) at the end of the session
  try { m_this.logTapeAlerts(); } catch (...) {}
  // Log (safely, exception-wise) the tape SCSI metrics at the end of the session
  try { m_this.logSCSIMetrics(); } catch(...) {}
77
78
  m_this.m_initialProcess.reportState(cta::tape::session::SessionState::Unmounting,
      cta::tape::session::SessionType::Archive);
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
  std::string currentErrorToCount = "Error_tapeUnload";
  try{
    // Do the final cleanup
    // First check that a tape is actually present in the drive. We can get here
    // after failing to mount (library error) in which case there is nothing to
    // do (and trying to unmount will only lead to a failure.)
    const uint32_t waitMediaInDriveTimeout = 60;
    try {
      m_this.m_drive.waitUntilReady(waitMediaInDriveTimeout);
    } catch (cta::exception::TimeOut &) {}
    if (!m_this.m_drive.hasTapeInPlace()) {
      m_this.m_logContext.log(cta::log::INFO, "TapeReadSingleThread: No tape to unload");
      goto done;
    }
    // in the special case of a "manual" mode tape, we should skip the unload too.
94
    if (cta::mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.librarySlot().getLibraryType()) {
95
      m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unloading, m_this.m_logContext);
96
97
98
99
100
101
102
103
104
105
      m_this.m_drive.unloadTape();
      m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread: Tape unloaded");
    } else {
        m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread: Tape NOT unloaded (manual mode)");
    }
    m_this.m_stats.unloadTime += m_timer.secs(cta::utils::Timer::resetCounter);
    // And return the tape to the library
    // In case of manual mode, this will be filtered by the rmc daemon
    // (which will do nothing)
    currentErrorToCount = "Error_tapeDismount";
106
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, m_this.m_logContext);
107
    m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.librarySlot());
108
    m_this.m_drive.disableLogicalBlockProtection();
109
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, m_this.m_logContext);
110
    m_this.m_stats.unmountTime += m_timer.secs(cta::utils::Timer::resetCounter);
111
    m_this.m_logContext.log(cta::log::INFO, cta::mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.librarySlot().getLibraryType() ?
112
      "TapeWriteSingleThread : tape unmounted":"TapeWriteSingleThread : tape NOT unmounted (manual mode)");
113
    m_this.m_initialProcess.reportState(cta::tape::session::SessionState::ShuttingDown,
114
115
116
117
118
119
      cta::tape::session::SessionType::Archive);
    m_this.m_stats.waitReportingTime += m_timer.secs(cta::utils::Timer::resetCounter);
  }
  catch(const cta::exception::Exception& ex){
    // Notify something failed during the cleaning 
    m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
120
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, m_this.m_logContext);
121
122
    cta::log::ScopedParamContainer scoped(m_this.m_logContext);
    scoped.add("exception_message", ex.getMessageValue());
123
    m_this.m_logContext.log(cta::log::ERR, "Exception in TapeWriteSingleThread-TapeCleaning. Putting the drive down.");
124
125
126
127
128
129
130
131
132
133
    // As we do not throw exceptions from here, the watchdog signalling has
    // to occur from here.
    try {
      if (currentErrorToCount.size()) {
        m_this.m_watchdog.addToErrorCount(currentErrorToCount);
      }
    } catch (...) {}
  } catch (...) {
     // Notify something failed during the cleaning 
     m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
134
     m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, m_this.m_logContext);
135
     m_this.m_logContext.log(cta::log::ERR, "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape. Putting the drive down.");
136
137
138
139
140
141
142
143
144
145
146
     try {
       if (currentErrorToCount.size()) {
         m_this.m_watchdog.addToErrorCount(currentErrorToCount);
       }
     } catch (...) {}
  }
  done:
    //then we terminate the global status reporter
    m_this.m_initialProcess.finish();
}

David COME's avatar
David COME committed
147
148
149
//------------------------------------------------------------------------------
//setlastFseq
//------------------------------------------------------------------------------
150
151
152
153
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
setlastFseq(uint64_t lastFseq){
  m_lastFseq=lastFseq;
}
David COME's avatar
David COME committed
154
155
156
//------------------------------------------------------------------------------
//openWriteSession
//------------------------------------------------------------------------------
157
std::unique_ptr<castor::tape::tapeFile::WriteSession> 
158
castor::tape::tapeserver::daemon::TapeWriteSingleThread::openWriteSession() {
Victor Kotlyar's avatar
Victor Kotlyar committed
159
160
  using cta::log::LogContext;
  using cta::log::Param;
161
162
  typedef LogContext::ScopedParam ScopedParam;
  
163
  std::unique_ptr<castor::tape::tapeFile::WriteSession> writeSession;
164
165
166
  
  ScopedParam sp[]={
    ScopedParam(m_logContext, Param("lastFseq", m_lastFseq)),
167
    ScopedParam(m_logContext, Param("compression", m_compress)),
168
    ScopedParam(m_logContext, Param("useLbp", m_useLbp)),
169
170
171
172
  };
  tape::utils::suppresUnusedVariable(sp);
  try {
    writeSession.reset(
173
174
    new castor::tape::tapeFile::WriteSession(m_drive, m_volInfo, m_lastFseq,
      m_compress, m_useLbp)
175
176
    );
  }
177
  catch (cta::exception::Exception & e) {
178
    ScopedParam sp0(m_logContext, Param("ErrorMessage", e.getMessageValue()));
Victor Kotlyar's avatar
Victor Kotlyar committed
179
    m_logContext.log(cta::log::ERR, "Failed to start tape write session");
180
181
182
183
184
185
    // TODO: log and unroll the session
    // TODO: add an unroll mode to the tape read task. (Similar to exec, but pushing blocks marked in error)
    throw;
  }
  return writeSession;
}
David COME's avatar
David COME committed
186
187
188
//------------------------------------------------------------------------------
//tapeFlush
//------------------------------------------------------------------------------
189
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
Eric Cano's avatar
Eric Cano committed
190
tapeFlush(const std::string& message,uint64_t bytes,uint64_t files,
Victor Kotlyar's avatar
Victor Kotlyar committed
191
  cta::utils::Timer & timer)
Eric Cano's avatar
Eric Cano committed
192
{
193
  m_drive.flush();
Victor Kotlyar's avatar
Victor Kotlyar committed
194
  double flushTime = timer.secs(cta::utils::Timer::resetCounter);
Victor Kotlyar's avatar
Victor Kotlyar committed
195
  cta::log::ScopedParamContainer params(m_logContext);
Eric Cano's avatar
Eric Cano committed
196
197
  params.add("files", files)
        .add("bytes", bytes)
198
        .add("flushTime", flushTime);
Victor Kotlyar's avatar
Victor Kotlyar committed
199
  m_logContext.log(cta::log::INFO,message);
Eric Cano's avatar
Eric Cano committed
200
  m_stats.flushTime += flushTime;
201
202
  

203
  m_reportPacker.reportFlush(m_drive.getCompression(), m_logContext);
204
  m_drive.clearCompressionStats();
205
}
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236

//------------------------------------------------------------------------
//   logAndCheckTapeAlertsForWrite
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::TapeWriteSingleThread::
logAndCheckTapeAlertsForWrite() {
  std::vector<uint16_t> tapeAlertCodes = m_drive.getTapeAlertCodes();
  if (tapeAlertCodes.empty()) return false;
  size_t alertNumber = 0;
  // Log tape alerts in the logs.
  std::vector<std::string> tapeAlerts = m_drive.getTapeAlerts(tapeAlertCodes);
  for (std::vector<std::string>::iterator ta=tapeAlerts.begin();
          ta!=tapeAlerts.end();ta++)
  {
    cta::log::ScopedParamContainer params(m_logContext);
    params.add("tapeAlert",*ta)
          .add("tapeAlertNumber", alertNumber++)
          .add("tapeAlertCount", tapeAlerts.size());
    m_logContext.log(cta::log::WARNING, "Tape alert detected");
  }
  // Add tape alerts in the tape log parameters
  std::vector<std::string> tapeAlertsCompact = 
    m_drive.getTapeAlertsCompact(tapeAlertCodes);
  for (std::vector<std::string>::iterator tac=tapeAlertsCompact.begin();
          tac!=tapeAlertsCompact.end();tac++)
  {
    countTapeLogError(std::string("Error_")+*tac);
  }
  return(m_drive.tapeAlertsCriticalForWrite(tapeAlertCodes));
}

237
238
239
240
241
242
243
//------------------------------------------------------------------------------
//   isTapeWritable
//-----------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
isTapeWritable() const {
// check that drive is not write protected
      if(m_drive.isWriteProtected()) {   
244
        cta::exception::Exception ex;
245
246
247
248
249
        ex.getMessage() <<
                "End session with error. Drive is write protected. Aborting labelling...";
        throw ex;
      }
}
250
251
252
253
254

//-----------------------------------------------------------------------------
// volumeModeToString
//-----------------------------------------------------------------------------
const char *castor::tape::tapeserver::daemon::TapeWriteSingleThread::
255
  mountTypeToString(const cta::common::dataStructures::MountType mountType) const throw() {
256
  switch(mountType) {
257
258
259
  case cta::common::dataStructures::MountType::Retrieve: return "Retrieve";
  case cta::common::dataStructures::MountType::Archive : return "Archive";
  case cta::common::dataStructures::MountType::Label: return "Label";
260
261
262
263
  default                      : return "UNKNOWN";
  }
}

264
//------------------------------------------------------------------------------
David COME's avatar
David COME committed
265
266
267
//------------------------------------------------------------------------------
//run
//------------------------------------------------------------------------------
268
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
Victor Kotlyar's avatar
Victor Kotlyar committed
269
  cta::log::ScopedParamContainer threadGlobalParams(m_logContext);
270
  threadGlobalParams.add("thread", "TapeWrite");
Victor Kotlyar's avatar
Victor Kotlyar committed
271
  cta::utils::Timer timer, totalTimer;
272
273
274
275
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
276
  std::string currentErrorToCount = "Error_tapeMountForWrite";
277
278
  try
  {
279
    // Report the parameters of the session to the main thread
Victor Kotlyar's avatar
Victor Kotlyar committed
280
    typedef cta::log::Param Param;
281
    m_watchdog.addParameter(Param("tapeVid", m_volInfo.vid));
282
    m_watchdog.addParameter(Param("mountType", mountTypeToString(m_volInfo.mountType)));
283
284
    m_watchdog.addParameter(Param("mountId", m_volInfo.mountId));
    m_watchdog.addParameter(Param("volReqId", m_volInfo.mountId));
285
    
286
287
    m_watchdog.addParameter(Param("tapeDrive",m_drive.config.unitName));
    m_watchdog.addParameter(Param("vendor",m_archiveMount.getVendor()));
288
289
290
291
    m_watchdog.addParameter(Param("vo",m_archiveMount.getVo()));
    m_watchdog.addParameter(Param("mediaType",m_archiveMount.getMediaType()));
    m_watchdog.addParameter(Param("tapePool",m_archiveMount.getPoolName()));
    m_watchdog.addParameter(Param("logicalLibrary",m_drive.config.logicalLibrary));
292
293
    m_watchdog.addParameter(Param("capacityInBytes",m_archiveMount.getCapacityInBytes()));
      
294
295
296
297
    // Set the tape thread time in the watchdog for total time estimation in case
    // of crash
    m_watchdog.updateThreadTimer(totalTimer);
    
298
    //pair of brackets to create an artificial scope for the tape cleaning
Eric Cano's avatar
Eric Cano committed
299
300
    {
      //log and notify
Victor Kotlyar's avatar
Victor Kotlyar committed
301
      m_logContext.log(cta::log::INFO, "Starting tape write thread");
302
      
Eric Cano's avatar
Eric Cano committed
303
304
305
      // The tape will be loaded 
      // it has to be unloaded, unmounted at all cost -> RAII
      // will also take care of the TapeServerReporter
306
      // 
Eric Cano's avatar
Eric Cano committed
307
      TapeCleaning cleaner(*this, timer);
308
      m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Mounting, m_logContext);
Eric Cano's avatar
Eric Cano committed
309
310
      // Before anything, the tape should be mounted
      // This call does the logging of the mount
311
312
313
314
315
316
317
318
319
      cta::log::ScopedParamContainer params(m_logContext);
      params.add("vo",m_archiveMount.getVo());
      params.add("mediaType",m_archiveMount.getMediaType());
      params.add("tapePool",m_archiveMount.getPoolName());
      params.add("logicalLibrary",m_drive.config.logicalLibrary);
      params.add("mountType",mountTypeToString(m_volInfo.mountType));
      params.add("vendor",m_archiveMount.getVendor());
      params.add("capacityInBytes",m_archiveMount.getCapacityInBytes());
      m_logContext.log(cta::log::INFO, "Tape session started");
320
      mountTapeReadWrite();
321
      currentErrorToCount = "Error_tapeLoad";
Eric Cano's avatar
Eric Cano committed
322
      waitForDrive();
323
      currentErrorToCount = "Error_checkingTapeAlert";
324
325
326
      if(logAndCheckTapeAlertsForWrite()) {
        throw cta::exception::Exception("Aborting migration session in"
          " presence of critical tape alerts");
327
      }
328

329
      currentErrorToCount = "Error_tapeNotWriteable";
330
331
      isTapeWritable();
      
Victor Kotlyar's avatar
Victor Kotlyar committed
332
      m_stats.mountTime += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
333
      {
Victor Kotlyar's avatar
Victor Kotlyar committed
334
        cta::log::ScopedParamContainer scoped(m_logContext);
335
        scoped.add("mountTime", m_stats.mountTime);
Victor Kotlyar's avatar
Victor Kotlyar committed
336
        m_logContext.log(cta::log::INFO, "Tape mounted and drive ready");
337
      }
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
      try {
        currentErrorToCount = "Error_tapeEncryptionEnable";
        // We want those scoped params to last for the whole mount.
        // This will allow each written file to be logged with its encryption
        // status:
        cta::log::ScopedParamContainer encryptionLogParams(m_logContext);
        {
          auto encryptionStatus = m_encryptionControl.enable(m_drive, m_volInfo.vid,
                                                             EncryptionControl::SetTag::SET_TAG);
 
          if (encryptionStatus.on) {
            encryptionLogParams.add("encryption", "on")
              .add("encryptionKey", encryptionStatus.keyName)
              .add("stdout", encryptionStatus.stdout);
            m_logContext.log(cta::log::INFO, "Drive encryption enabled for this mount");
          } else {
            encryptionLogParams.add("encryption", "off");
            m_logContext.log(cta::log::INFO, "Drive encryption not enabled for this mount");
          }
        }
        m_stats.encryptionControlTime += timer.secs(cta::utils::Timer::resetCounter);
      }
360
      catch (cta::exception::Exception &ex) {
361
362
363
364
365
        cta::log::ScopedParamContainer params(m_logContext);
        params.add("ErrorMessage", ex.getMessage().str());
        m_logContext.log(cta::log::ERR, "Drive encryption could not be enabled for this mount.");
        throw;
      }
366
      currentErrorToCount = "Error_tapePositionForWrite";
367
      // Then we have to initialize the tape write session
368
      std::unique_ptr<castor::tape::tapeFile::WriteSession> writeSession(openWriteSession());
Victor Kotlyar's avatar
Victor Kotlyar committed
369
      m_stats.positionTime  += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
370
      {
Victor Kotlyar's avatar
Victor Kotlyar committed
371
        cta::log::ScopedParamContainer scoped(m_logContext);
372
        scoped.add("positionTime", m_stats.positionTime);
373
374
375
376
        scoped.add("useLbp", m_useLbp);
        scoped.add("detectedLbp", writeSession->isTapeWithLbp());

        if (!writeSession->isTapeWithLbp() && m_useLbp) {
Victor Kotlyar's avatar
Victor Kotlyar committed
377
          m_logContext.log(cta::log::INFO, "Tapserver started with LBP support but "
378
379
380
381
            "the tape without LBP label mounted");
        }
        switch(m_drive.getLbpToUse()) {
          case drive::lbpToUse::crc32cReadWrite:
Victor Kotlyar's avatar
Victor Kotlyar committed
382
            m_logContext.log(cta::log::INFO, "Write session initialised with LBP"
383
384
385
386
              " crc32c in ReadWrite mode, tape VID checked and drive positioned"
              " for writing");
            break;
          case drive::lbpToUse::disabled:
Victor Kotlyar's avatar
Victor Kotlyar committed
387
            m_logContext.log(cta::log::INFO, "Write session initialised without LBP"
388
389
390
              ", tape VID checked and drive positioned for writing");
            break;
          default:
Victor Kotlyar's avatar
Victor Kotlyar committed
391
            m_logContext.log(cta::log::ERR, "Write session initialised with "
392
393
394
              "unsupported LBP method, tape VID checked and drive positioned"
              " for writing");
        }
395
      }
396

397
398
      m_initialProcess.reportState(cta::tape::session::SessionState::Running,
        cta::tape::session::SessionType::Archive);
Eric Cano's avatar
Eric Cano committed
399
400
      uint64_t bytes=0;
      uint64_t files=0;
Victor Kotlyar's avatar
Victor Kotlyar committed
401
      m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
402
403
      // Tasks handle their error logging themselves.
      currentErrorToCount = "";
Daniele Kruse's avatar
Daniele Kruse committed
404
      std::unique_ptr<TapeWriteTask> task;   
Eric Cano's avatar
Eric Cano committed
405
      m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring, m_logContext); 
Eric Cano's avatar
Eric Cano committed
406
407
408
      while(1) {
        //get a task
        task.reset(m_tasks.pop());
Victor Kotlyar's avatar
Victor Kotlyar committed
409
        m_stats.waitInstructionsTime += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
410
411
412
413
        //if is the end
        if(NULL==task.get()) {      
          //we flush without asking
          tapeFlush("No more data to write on tape, unconditional flushing to the client",bytes,files,timer);
Victor Kotlyar's avatar
Victor Kotlyar committed
414
          m_stats.flushTime += timer.secs(cta::utils::Timer::resetCounter);
Victor Kotlyar's avatar
Victor Kotlyar committed
415
416
          cta::log::LogContext::ScopedParam sp0(m_logContext, cta::log::Param("tapeThreadDuration", totalTimer.secs()));
          m_logContext.log(cta::log::DEBUG, "writing data to tape has finished");
Eric Cano's avatar
Eric Cano committed
417
418
          break;
        }
419
        task->execute(*writeSession,m_reportPacker,m_watchdog,m_logContext,timer);
420
421
        // Add the tasks counts to the session's
        m_stats.add(task->getTaskStats());
422
        // Transmit the statistics to the watchdog thread
423
        m_watchdog.updateStatsWithoutDeliveryTime(m_stats);
Eric Cano's avatar
Eric Cano committed
424
425
426
427
428
429
        // Increase local flush counters (session counters are incremented by
        // the task)
        files++;
        bytes+=task->fileSize();
        //if one flush counter is above a threshold, then we flush
        if (files >= m_filesBeforeFlush || bytes >= m_bytesBeforeFlush) {
430
          currentErrorToCount = "Error_tapeFlush";
Eric Cano's avatar
Eric Cano committed
431
432
433
          tapeFlush("Normal flush because thresholds was reached",bytes,files,timer);
          files=0;
          bytes=0;
434
          currentErrorToCount = "";
Eric Cano's avatar
Eric Cano committed
435
436
437
        }
      } //end of while(1))
    }
438

Eric Cano's avatar
Eric Cano committed
439
440
    // The session completed successfully, and the cleaner (unmount) executed
    // at the end of the previous block. Log the results.
Victor Kotlyar's avatar
Victor Kotlyar committed
441
    cta::log::ScopedParamContainer params(m_logContext);
442
    params.add("status", "success");
443
    m_stats.totalTime = totalTimer.secs();
444
    m_stats.deliveryTime = m_stats.totalTime;
Victor Kotlyar's avatar
Victor Kotlyar committed
445
    logWithStats(cta::log::INFO, "Tape thread complete",params);
446
447
    // Report one last time the stats, after unloading/unmounting.
    m_watchdog.updateStats(m_stats);
448
449
    //end of session + log
    m_reportPacker.reportEndOfSession(m_logContext);
450
  } //end of try 
451
452
453
454
455
456
457
458
459
460
461
462
463
  catch(const cta::exception::Exception& e){
    //we end there because write session could not be opened 
    //or because a task failed or because flush failed
    
    // First off, indicate the problem to the task injector so it does not inject
    // more work in the pipeline
    // If the problem did not originate here, we just re-flag the error, and
    // this has no effect, but if we had a problem with a non-file operation
    // like mounting the tape, then we have to signal the problem to the disk
    // side and the task injector, which will trigger the end of session.
    m_injector->setErrorFlag();
    // We can still update the session stats one last time (unmount timings
    // should have been updated by the RAII cleaner/unmounter).
464
    m_watchdog.updateStatsWithoutDeliveryTime(m_stats);
465
466
467
468
469
470
    
    // If we reached the end of tape, this is not an error (ENOSPC)
    try {
      // If it's not the error we're looking for, we will go about our business
      // in the catch section. dynamic cast will throw, and we'll do ourselves
      // if the error code is not the one we want.
471
472
      const cta::exception::Errnum & en = 
        dynamic_cast<const cta::exception::Errnum &>(e);
473
474
475
476
477
      if(en.errorNumber()!= ENOSPC) {
        throw 0;
      }
      // This is indeed the end of the tape. Not an error.
      m_watchdog.setErrorCount("Info_tapeFilledUp",1);
478
      m_reportPacker.reportTapeFull(m_logContext);
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
    } catch (...) {
      // The error is not an ENOSPC, so it is, indeed, an error.
      // If we got here with a new error, currentErrorToCount will be non-empty,
      // and we will pass the error name to the watchdog.
      if(currentErrorToCount.size()) {
        m_watchdog.addToErrorCount(currentErrorToCount);
      }
    }
    
    //first empty all the tasks and circulate mem blocks
    while(1) {
      std::unique_ptr<TapeWriteTask>  task(m_tasks.pop());
      if(task.get()==NULL) {
        break;
      }
      task->circulateMemBlocks();
    }
    // Prepare the standard error codes for the session
    std::string errorMessage(e.getMessageValue());
498
    int errorCode(666);
499
500
501
    // Override if we got en ENOSPC error (end of tape)
    // This is 
    try {
502
503
      const cta::exception::Errnum & errnum = 
          dynamic_cast<const cta::exception::Errnum &> (e);
504
505
506
507
508
509
      if (ENOSPC == errnum.errorNumber()) {
        errorCode = ENOSPC;
        errorMessage = "End of migration due to tape full";
      }
    } catch (...) {}
    // then log the end of write thread
Victor Kotlyar's avatar
Victor Kotlyar committed
510
    cta::log::ScopedParamContainer params(m_logContext);
511
512
513
    params.add("status", "error")
          .add("ErrorMesage", errorMessage);
    m_stats.totalTime = totalTimer.secs();
Victor Kotlyar's avatar
Victor Kotlyar committed
514
    logWithStats(cta::log::INFO, "Tape thread complete",
515
            params);
516
    m_reportPacker.reportEndOfSessionWithErrors(errorMessage,errorCode, m_logContext);
517
  }      
518
}
519

520
521
522
//------------------------------------------------------------------------------
//logWithStats
//------------------------------------------------------------------------------
523
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::logWithStats(
Victor Kotlyar's avatar
Victor Kotlyar committed
524
int level,const std::string& msg, cta::log::ScopedParamContainer& params){
525
  params.add("type", "write")
526
        .add("tapeVid", m_volInfo.vid)
527
528
529
530
        .add("mountTime", m_stats.mountTime)
        .add("positionTime", m_stats.positionTime)
        .add("waitInstructionsTime", m_stats.waitInstructionsTime)
        .add("checksumingTime", m_stats.checksumingTime)
531
        .add("readWriteTime", m_stats.readWriteTime)
532
533
534
535
536
        .add("waitDataTime", m_stats.waitDataTime)
        .add("waitReportingTime", m_stats.waitReportingTime)
        .add("flushTime", m_stats.flushTime)
        .add("unloadTime", m_stats.unloadTime)
        .add("unmountTime", m_stats.unmountTime)
537
        .add("encryptionControlTime", m_stats.encryptionControlTime)
538
        .add("transferTime", m_stats.transferTime())
539
        .add("totalTime", m_stats.totalTime)
540
541
542
        .add("dataVolume", m_stats.dataVolume)
        .add("headerVolume", m_stats.headerVolume)
        .add("files", m_stats.filesCount)
543
544
545
546
        .add("payloadTransferSpeedMBps", m_stats.totalTime?1.0*m_stats.dataVolume
                /1000/1000/m_stats.totalTime:0.0)
        .add("driveTransferSpeedMBps", m_stats.totalTime?1.0*(m_stats.dataVolume+m_stats.headerVolume)
                /1000/1000/m_stats.totalTime:0.0);
547
  m_logContext.moveToTheEndIfPresent("status");
548
549
  m_logContext.log(level, msg);
}
550
551
552
553
554
555
556
557
558
559

//------------------------------------------------------------------------------
//logSCSIMetrics
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::logSCSIMetrics() {
  try {
    // mount general statistics
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    // get mount general stats
560
    std::map<std::string, uint64_t> scsi_write_metrics_hash = m_drive.getTapeWriteErrors();
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
    appendMetricsToScopedParams(scopedContainer, scsi_write_metrics_hash);
    std::map<std::string, uint32_t> scsi_nonmedium_metrics_hash = m_drive.getTapeNonMediumErrors();
    appendMetricsToScopedParams(scopedContainer, scsi_nonmedium_metrics_hash);
    logSCSIStats("Logging mount general statistics",
      scsi_write_metrics_hash.size() + scsi_nonmedium_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
    scoped.add("exception_message", ex.getMessageValue());
    m_logContext.log(cta::log::ERR, "Exception in logging mount general statistics");
  }

  // drive statistics
  try {
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    // get drive stats
    std::map<std::string,float> scsi_quality_metrics_hash = m_drive.getQualityStats();
    appendMetricsToScopedParams(scopedContainer, scsi_quality_metrics_hash);
    std::map<std::string,uint32_t> scsi_drive_metrics_hash = m_drive.getDriveStats();
    appendMetricsToScopedParams(scopedContainer, scsi_drive_metrics_hash);
    logSCSIStats("Logging drive statistics",
      scsi_quality_metrics_hash.size()+scsi_drive_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
    scoped.add("exception_message", ex.getMessageValue());
    m_logContext.log(cta::log::ERR, "Exception in logging drive statistics");
  }

  // volume statistics
  try {
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    std::map<std::string,uint32_t> scsi_metrics_hash = m_drive.getVolumeStats();
    appendMetricsToScopedParams(scopedContainer, scsi_metrics_hash);
    logSCSIStats("Logging volume statistics", scsi_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
    scoped.add("exception_message", ex.getMessageValue());
    m_logContext.log(cta::log::ERR, "Exception in logging volume statistics");
  }
}