TapeWriteSingleThread.cpp 29.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
25
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
26

David COME's avatar
David COME committed
27
28
29
//------------------------------------------------------------------------------
//constructor
//------------------------------------------------------------------------------
30
castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeWriteSingleThread(
31
castor::tape::tapeserver::drive::DriveInterface & drive, 
32
        cta::mediachanger::MediaChangerFacade & mc,
33
        TapeServerReporter & tsr,
34
        MigrationWatchDog & mwd,
35
        const VolumeInfo& volInfo,
Victor Kotlyar's avatar
Victor Kotlyar committed
36
        cta::log::LogContext & lc,
Daniele Kruse's avatar
Daniele Kruse committed
37
        MigrationReportPacker & repPacker,
38
        cta::server::ProcessCap &capUtils,
39
        uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush,
40
        const bool useLbp, const std::string & externalEncryptionKeyScript,
41
42
        const cta::ArchiveMount & archiveMount,
        const uint64_t tapeLoadTimeout):
43
        TapeSingleThreadInterface<TapeWriteTask>(drive, mc, tsr, volInfo, 
44
          capUtils, lc, externalEncryptionKeyScript,tapeLoadTimeout),
45
46
        m_filesBeforeFlush(filesBeforeFlush),
        m_bytesBeforeFlush(bytesBeforeFlush),
Daniele Kruse's avatar
Daniele Kruse committed
47
48
        m_drive(drive),
        m_reportPacker(repPacker),
49
        m_lastFseq(-1),
50
        m_compress(true),
51
        m_useLbp(useLbp),
52
        m_watchdog(mwd),
53
        m_archiveMount(archiveMount){}
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

//------------------------------------------------------------------------------
//TapeCleaning::~TapeCleaning()
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeCleaning(){
  // Disable encryption (or at least try)
  try {
    if (m_this.m_encryptionControl.disable(m_this.m_drive))
      m_this.m_logContext.log(cta::log::INFO, "Turned encryption off before unmounting");
  } catch (cta::exception::Exception & ex) {
    cta::log::ScopedParamContainer scoped(m_this.m_logContext);
    scoped.add("exceptionError", ex.getMessageValue());
    m_this.m_logContext.log(cta::log::ERR, "Failed to turn off encryption before unmounting");
  }
  m_this.m_stats.encryptionControlTime += m_timer.secs(cta::utils::Timer::resetCounter);
69
  m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::CleaningUp, cta::nullopt, m_this.m_logContext);
70
71
72
73
74
75
76
77
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
  // Log (safely, exception-wise) the tape alerts (if any) at the end of the session
  try { m_this.logTapeAlerts(); } catch (...) {}
  // Log (safely, exception-wise) the tape SCSI metrics at the end of the session
  try { m_this.logSCSIMetrics(); } catch(...) {}
78
79
  m_this.m_initialProcess.reportState(cta::tape::session::SessionState::Unmounting,
      cta::tape::session::SessionType::Archive);
80
81
82
83
84
85
  std::string currentErrorToCount = "Error_tapeUnload";
  try{
    // Do the final cleanup
    // First check that a tape is actually present in the drive. We can get here
    // after failing to mount (library error) in which case there is nothing to
    // do (and trying to unmount will only lead to a failure.)
86
    const uint32_t waitMediaInDriveTimeout = m_this.m_tapeLoadTimeout;
87
88
89
90
91
92
93
94
    try {
      m_this.m_drive.waitUntilReady(waitMediaInDriveTimeout);
    } catch (cta::exception::TimeOut &) {}
    if (!m_this.m_drive.hasTapeInPlace()) {
      m_this.m_logContext.log(cta::log::INFO, "TapeReadSingleThread: No tape to unload");
      goto done;
    }
    // in the special case of a "manual" mode tape, we should skip the unload too.
95
    if (cta::mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.librarySlot().getLibraryType()) {
96
      m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unloading,cta::nullopt, m_this.m_logContext);
97
98
99
100
101
102
103
104
105
106
      m_this.m_drive.unloadTape();
      m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread: Tape unloaded");
    } else {
        m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread: Tape NOT unloaded (manual mode)");
    }
    m_this.m_stats.unloadTime += m_timer.secs(cta::utils::Timer::resetCounter);
    // And return the tape to the library
    // In case of manual mode, this will be filtered by the rmc daemon
    // (which will do nothing)
    currentErrorToCount = "Error_tapeDismount";
107
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, cta::nullopt, m_this.m_logContext);
108
    m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.librarySlot());
109
    m_this.m_drive.disableLogicalBlockProtection();
110
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, cta::nullopt, m_this.m_logContext);
111
    m_this.m_stats.unmountTime += m_timer.secs(cta::utils::Timer::resetCounter);
112
    m_this.m_logContext.log(cta::log::INFO, cta::mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.librarySlot().getLibraryType() ?
113
      "TapeWriteSingleThread : tape unmounted":"TapeWriteSingleThread : tape NOT unmounted (manual mode)");
114
    m_this.m_initialProcess.reportState(cta::tape::session::SessionState::ShuttingDown,
115
116
117
118
119
120
      cta::tape::session::SessionType::Archive);
    m_this.m_stats.waitReportingTime += m_timer.secs(cta::utils::Timer::resetCounter);
  }
  catch(const cta::exception::Exception& ex){
    // Notify something failed during the cleaning 
    m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
121
122
123
124
    const int logLevel = cta::log::ERR;
    const std::string errorMsg = "Exception in TapeWriteSingleThread-TapeCleaning. Putting the drive down.";
    cta::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel,errorMsg);
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down,reason, m_this.m_logContext);
125
    cta::log::ScopedParamContainer scoped(m_this.m_logContext);
126
    scoped.add("exceptionMessage", ex.getMessageValue());
127
    m_this.m_logContext.log(logLevel, errorMsg);
128
129
130
131
132
133
134
135
136
    // As we do not throw exceptions from here, the watchdog signalling has
    // to occur from here.
    try {
      if (currentErrorToCount.size()) {
        m_this.m_watchdog.addToErrorCount(currentErrorToCount);
      }
    } catch (...) {}
  } catch (...) {
     // Notify something failed during the cleaning 
137
138
139
    const int logLevel = cta::log::ERR;
    const std::string errorMsg = "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape. Putting the drive down.";
    cta::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel,errorMsg);
140
     m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
141
142
     m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down,reason,m_this.m_logContext);
     m_this.m_logContext.log(logLevel,errorMsg);
143
144
145
146
147
148
149
150
151
152
153
     try {
       if (currentErrorToCount.size()) {
         m_this.m_watchdog.addToErrorCount(currentErrorToCount);
       }
     } catch (...) {}
  }
  done:
    //then we terminate the global status reporter
    m_this.m_initialProcess.finish();
}

David COME's avatar
David COME committed
154
155
156
//------------------------------------------------------------------------------
//setlastFseq
//------------------------------------------------------------------------------
157
158
159
160
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
setlastFseq(uint64_t lastFseq){
  m_lastFseq=lastFseq;
}
David COME's avatar
David COME committed
161
162
163
//------------------------------------------------------------------------------
//openWriteSession
//------------------------------------------------------------------------------
164
std::unique_ptr<castor::tape::tapeFile::WriteSession> 
165
castor::tape::tapeserver::daemon::TapeWriteSingleThread::openWriteSession() {
Victor Kotlyar's avatar
Victor Kotlyar committed
166
167
  using cta::log::LogContext;
  using cta::log::Param;
168
169
  typedef LogContext::ScopedParam ScopedParam;
  
170
  std::unique_ptr<castor::tape::tapeFile::WriteSession> writeSession;
171
172
173
  
  ScopedParam sp[]={
    ScopedParam(m_logContext, Param("lastFseq", m_lastFseq)),
174
    ScopedParam(m_logContext, Param("compression", m_compress)),
175
    ScopedParam(m_logContext, Param("useLbp", m_useLbp)),
176
177
178
179
  };
  tape::utils::suppresUnusedVariable(sp);
  try {
    writeSession.reset(
180
181
    new castor::tape::tapeFile::WriteSession(m_drive, m_volInfo, m_lastFseq,
      m_compress, m_useLbp)
182
183
    );
  }
184
  catch (cta::exception::Exception & e) {
185
    ScopedParam sp0(m_logContext, Param("ErrorMessage", e.getMessageValue()));
Victor Kotlyar's avatar
Victor Kotlyar committed
186
    m_logContext.log(cta::log::ERR, "Failed to start tape write session");
187
188
189
190
191
192
    // TODO: log and unroll the session
    // TODO: add an unroll mode to the tape read task. (Similar to exec, but pushing blocks marked in error)
    throw;
  }
  return writeSession;
}
David COME's avatar
David COME committed
193
194
195
//------------------------------------------------------------------------------
//tapeFlush
//------------------------------------------------------------------------------
196
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
Eric Cano's avatar
Eric Cano committed
197
tapeFlush(const std::string& message,uint64_t bytes,uint64_t files,
Victor Kotlyar's avatar
Victor Kotlyar committed
198
  cta::utils::Timer & timer)
Eric Cano's avatar
Eric Cano committed
199
{
200
  m_drive.flush();
Victor Kotlyar's avatar
Victor Kotlyar committed
201
  double flushTime = timer.secs(cta::utils::Timer::resetCounter);
Victor Kotlyar's avatar
Victor Kotlyar committed
202
  cta::log::ScopedParamContainer params(m_logContext);
Eric Cano's avatar
Eric Cano committed
203
204
  params.add("files", files)
        .add("bytes", bytes)
205
        .add("flushTime", flushTime);
Victor Kotlyar's avatar
Victor Kotlyar committed
206
  m_logContext.log(cta::log::INFO,message);
Eric Cano's avatar
Eric Cano committed
207
  m_stats.flushTime += flushTime;
208
209
  

210
  m_reportPacker.reportFlush(m_drive.getCompression(), m_logContext);
211
  m_drive.clearCompressionStats();
212
}
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243

//------------------------------------------------------------------------
//   logAndCheckTapeAlertsForWrite
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::TapeWriteSingleThread::
logAndCheckTapeAlertsForWrite() {
  std::vector<uint16_t> tapeAlertCodes = m_drive.getTapeAlertCodes();
  if (tapeAlertCodes.empty()) return false;
  size_t alertNumber = 0;
  // Log tape alerts in the logs.
  std::vector<std::string> tapeAlerts = m_drive.getTapeAlerts(tapeAlertCodes);
  for (std::vector<std::string>::iterator ta=tapeAlerts.begin();
          ta!=tapeAlerts.end();ta++)
  {
    cta::log::ScopedParamContainer params(m_logContext);
    params.add("tapeAlert",*ta)
          .add("tapeAlertNumber", alertNumber++)
          .add("tapeAlertCount", tapeAlerts.size());
    m_logContext.log(cta::log::WARNING, "Tape alert detected");
  }
  // Add tape alerts in the tape log parameters
  std::vector<std::string> tapeAlertsCompact = 
    m_drive.getTapeAlertsCompact(tapeAlertCodes);
  for (std::vector<std::string>::iterator tac=tapeAlertsCompact.begin();
          tac!=tapeAlertsCompact.end();tac++)
  {
    countTapeLogError(std::string("Error_")+*tac);
  }
  return(m_drive.tapeAlertsCriticalForWrite(tapeAlertCodes));
}

244
245
246
247
248
249
250
//------------------------------------------------------------------------------
//   isTapeWritable
//-----------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
isTapeWritable() const {
// check that drive is not write protected
      if(m_drive.isWriteProtected()) {   
251
        cta::exception::Exception ex;
252
253
254
255
256
        ex.getMessage() <<
                "End session with error. Drive is write protected. Aborting labelling...";
        throw ex;
      }
}
257
258
259
260
261

//-----------------------------------------------------------------------------
// volumeModeToString
//-----------------------------------------------------------------------------
const char *castor::tape::tapeserver::daemon::TapeWriteSingleThread::
262
  mountTypeToString(const cta::common::dataStructures::MountType mountType) const throw() {
263
  switch(mountType) {
264
  case cta::common::dataStructures::MountType::Retrieve: return "Retrieve";
265
266
  case cta::common::dataStructures::MountType::ArchiveForUser: return "ArchiveForUser";
  case cta::common::dataStructures::MountType::ArchiveForRepack: return "ArchiveForRepack";
267
  case cta::common::dataStructures::MountType::Label: return "Label";
268
269
270
271
  default                      : return "UNKNOWN";
  }
}

272
//------------------------------------------------------------------------------
David COME's avatar
David COME committed
273
274
275
//------------------------------------------------------------------------------
//run
//------------------------------------------------------------------------------
276
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
Victor Kotlyar's avatar
Victor Kotlyar committed
277
  cta::log::ScopedParamContainer threadGlobalParams(m_logContext);
278
  threadGlobalParams.add("thread", "TapeWrite");
Victor Kotlyar's avatar
Victor Kotlyar committed
279
  cta::utils::Timer timer, totalTimer;
280
281
282
283
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
284
  std::string currentErrorToCount = "Error_tapeMountForWrite";
285
286
  try
  {
287
    // Report the parameters of the session to the main thread
Victor Kotlyar's avatar
Victor Kotlyar committed
288
    typedef cta::log::Param Param;
289
    m_watchdog.addParameter(Param("tapeVid", m_volInfo.vid));
290
    m_watchdog.addParameter(Param("mountType", mountTypeToString(m_volInfo.mountType)));
291
292
    m_watchdog.addParameter(Param("mountId", m_volInfo.mountId));
    m_watchdog.addParameter(Param("volReqId", m_volInfo.mountId));
293
    
294
295
    m_watchdog.addParameter(Param("tapeDrive",m_drive.config.unitName));
    m_watchdog.addParameter(Param("vendor",m_archiveMount.getVendor()));
296
297
298
299
    m_watchdog.addParameter(Param("vo",m_archiveMount.getVo()));
    m_watchdog.addParameter(Param("mediaType",m_archiveMount.getMediaType()));
    m_watchdog.addParameter(Param("tapePool",m_archiveMount.getPoolName()));
    m_watchdog.addParameter(Param("logicalLibrary",m_drive.config.logicalLibrary));
300
301
    m_watchdog.addParameter(Param("capacityInBytes",m_archiveMount.getCapacityInBytes()));
      
302
303
304
305
    // Set the tape thread time in the watchdog for total time estimation in case
    // of crash
    m_watchdog.updateThreadTimer(totalTimer);
    
306
    //pair of brackets to create an artificial scope for the tape cleaning
Eric Cano's avatar
Eric Cano committed
307
308
    {
      //log and notify
Victor Kotlyar's avatar
Victor Kotlyar committed
309
      m_logContext.log(cta::log::INFO, "Starting tape write thread");
310
      
Eric Cano's avatar
Eric Cano committed
311
312
313
      // The tape will be loaded 
      // it has to be unloaded, unmounted at all cost -> RAII
      // will also take care of the TapeServerReporter
314
      // 
Eric Cano's avatar
Eric Cano committed
315
      TapeCleaning cleaner(*this, timer);
316
      m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Mounting,cta::nullopt, m_logContext);
Eric Cano's avatar
Eric Cano committed
317
318
      // Before anything, the tape should be mounted
      // This call does the logging of the mount
319
320
321
322
323
324
325
326
327
      cta::log::ScopedParamContainer params(m_logContext);
      params.add("vo",m_archiveMount.getVo());
      params.add("mediaType",m_archiveMount.getMediaType());
      params.add("tapePool",m_archiveMount.getPoolName());
      params.add("logicalLibrary",m_drive.config.logicalLibrary);
      params.add("mountType",mountTypeToString(m_volInfo.mountType));
      params.add("vendor",m_archiveMount.getVendor());
      params.add("capacityInBytes",m_archiveMount.getCapacityInBytes());
      m_logContext.log(cta::log::INFO, "Tape session started");
328
      mountTapeReadWrite();
329
      currentErrorToCount = "Error_tapeLoad";
330
      cta::utils::Timer tapeLoadTimer;
Eric Cano's avatar
Eric Cano committed
331
      waitForDrive();
332
      double tapeLoadTime = tapeLoadTimer.secs();
333
      currentErrorToCount = "Error_checkingTapeAlert";
334
335
336
      if(logAndCheckTapeAlertsForWrite()) {
        throw cta::exception::Exception("Aborting migration session in"
          " presence of critical tape alerts");
337
      }
338

339
      currentErrorToCount = "Error_tapeNotWriteable";
340
341
      isTapeWritable();
      
Victor Kotlyar's avatar
Victor Kotlyar committed
342
      m_stats.mountTime += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
343
      {
Victor Kotlyar's avatar
Victor Kotlyar committed
344
        cta::log::ScopedParamContainer scoped(m_logContext);
345
        scoped.add("mountTime", m_stats.mountTime);
346
        scoped.add("tapeLoadTime",tapeLoadTime);
Victor Kotlyar's avatar
Victor Kotlyar committed
347
        m_logContext.log(cta::log::INFO, "Tape mounted and drive ready");
348
      }
349
      m_archiveMount.setTapeMounted(m_logContext);
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
      try {
        currentErrorToCount = "Error_tapeEncryptionEnable";
        // We want those scoped params to last for the whole mount.
        // This will allow each written file to be logged with its encryption
        // status:
        cta::log::ScopedParamContainer encryptionLogParams(m_logContext);
        {
          auto encryptionStatus = m_encryptionControl.enable(m_drive, m_volInfo.vid,
                                                             EncryptionControl::SetTag::SET_TAG);
 
          if (encryptionStatus.on) {
            encryptionLogParams.add("encryption", "on")
              .add("encryptionKey", encryptionStatus.keyName)
              .add("stdout", encryptionStatus.stdout);
            m_logContext.log(cta::log::INFO, "Drive encryption enabled for this mount");
          } else {
            encryptionLogParams.add("encryption", "off");
            m_logContext.log(cta::log::INFO, "Drive encryption not enabled for this mount");
          }
        }
        m_stats.encryptionControlTime += timer.secs(cta::utils::Timer::resetCounter);
      }
372
      catch (cta::exception::Exception &ex) {
373
374
375
376
377
        cta::log::ScopedParamContainer params(m_logContext);
        params.add("ErrorMessage", ex.getMessage().str());
        m_logContext.log(cta::log::ERR, "Drive encryption could not be enabled for this mount.");
        throw;
      }
378
      currentErrorToCount = "Error_tapePositionForWrite";
379
      // Then we have to initialize the tape write session
380
      std::unique_ptr<castor::tape::tapeFile::WriteSession> writeSession(openWriteSession());
Victor Kotlyar's avatar
Victor Kotlyar committed
381
      m_stats.positionTime  += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
382
      {
Victor Kotlyar's avatar
Victor Kotlyar committed
383
        cta::log::ScopedParamContainer scoped(m_logContext);
384
        scoped.add("positionTime", m_stats.positionTime);
385
386
387
388
        scoped.add("useLbp", m_useLbp);
        scoped.add("detectedLbp", writeSession->isTapeWithLbp());

        if (!writeSession->isTapeWithLbp() && m_useLbp) {
Victor Kotlyar's avatar
Victor Kotlyar committed
389
          m_logContext.log(cta::log::INFO, "Tapserver started with LBP support but "
390
391
392
393
            "the tape without LBP label mounted");
        }
        switch(m_drive.getLbpToUse()) {
          case drive::lbpToUse::crc32cReadWrite:
Victor Kotlyar's avatar
Victor Kotlyar committed
394
            m_logContext.log(cta::log::INFO, "Write session initialised with LBP"
395
396
397
398
              " crc32c in ReadWrite mode, tape VID checked and drive positioned"
              " for writing");
            break;
          case drive::lbpToUse::disabled:
Victor Kotlyar's avatar
Victor Kotlyar committed
399
            m_logContext.log(cta::log::INFO, "Write session initialised without LBP"
400
401
402
              ", tape VID checked and drive positioned for writing");
            break;
          default:
Victor Kotlyar's avatar
Victor Kotlyar committed
403
            m_logContext.log(cta::log::ERR, "Write session initialised with "
404
405
406
              "unsupported LBP method, tape VID checked and drive positioned"
              " for writing");
        }
407
      }
408

409
410
      m_initialProcess.reportState(cta::tape::session::SessionState::Running,
        cta::tape::session::SessionType::Archive);
Eric Cano's avatar
Eric Cano committed
411
412
      uint64_t bytes=0;
      uint64_t files=0;
Victor Kotlyar's avatar
Victor Kotlyar committed
413
      m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
414
415
      // Tasks handle their error logging themselves.
      currentErrorToCount = "";
Daniele Kruse's avatar
Daniele Kruse committed
416
      std::unique_ptr<TapeWriteTask> task;   
417
      m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring,cta::nullopt, m_logContext); 
Eric Cano's avatar
Eric Cano committed
418
419
420
      while(1) {
        //get a task
        task.reset(m_tasks.pop());
Victor Kotlyar's avatar
Victor Kotlyar committed
421
        m_stats.waitInstructionsTime += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
422
423
424
425
        //if is the end
        if(NULL==task.get()) {      
          //we flush without asking
          tapeFlush("No more data to write on tape, unconditional flushing to the client",bytes,files,timer);
Victor Kotlyar's avatar
Victor Kotlyar committed
426
          m_stats.flushTime += timer.secs(cta::utils::Timer::resetCounter);
Victor Kotlyar's avatar
Victor Kotlyar committed
427
428
          cta::log::LogContext::ScopedParam sp0(m_logContext, cta::log::Param("tapeThreadDuration", totalTimer.secs()));
          m_logContext.log(cta::log::DEBUG, "writing data to tape has finished");
Eric Cano's avatar
Eric Cano committed
429
430
          break;
        }
431
        task->execute(*writeSession,m_reportPacker,m_watchdog,m_logContext,timer);
432
433
        // Add the tasks counts to the session's
        m_stats.add(task->getTaskStats());
434
        // Transmit the statistics to the watchdog thread
435
        m_watchdog.updateStatsWithoutDeliveryTime(m_stats);
Eric Cano's avatar
Eric Cano committed
436
437
438
439
440
441
        // Increase local flush counters (session counters are incremented by
        // the task)
        files++;
        bytes+=task->fileSize();
        //if one flush counter is above a threshold, then we flush
        if (files >= m_filesBeforeFlush || bytes >= m_bytesBeforeFlush) {
442
          currentErrorToCount = "Error_tapeFlush";
Eric Cano's avatar
Eric Cano committed
443
444
445
          tapeFlush("Normal flush because thresholds was reached",bytes,files,timer);
          files=0;
          bytes=0;
446
          currentErrorToCount = "";
Eric Cano's avatar
Eric Cano committed
447
448
449
        }
      } //end of while(1))
    }
450

Eric Cano's avatar
Eric Cano committed
451
452
    // The session completed successfully, and the cleaner (unmount) executed
    // at the end of the previous block. Log the results.
Victor Kotlyar's avatar
Victor Kotlyar committed
453
    cta::log::ScopedParamContainer params(m_logContext);
454
    params.add("status", "success");
455
    m_stats.totalTime = totalTimer.secs();
456
    m_stats.deliveryTime = m_stats.totalTime;
Victor Kotlyar's avatar
Victor Kotlyar committed
457
    logWithStats(cta::log::INFO, "Tape thread complete",params);
458
459
    // Report one last time the stats, after unloading/unmounting.
    m_watchdog.updateStats(m_stats);
460
461
    //end of session + log
    m_reportPacker.reportEndOfSession(m_logContext);
462
  } //end of try 
463
464
465
466
467
468
469
470
471
472
473
474
475
  catch(const cta::exception::Exception& e){
    //we end there because write session could not be opened 
    //or because a task failed or because flush failed
    
    // First off, indicate the problem to the task injector so it does not inject
    // more work in the pipeline
    // If the problem did not originate here, we just re-flag the error, and
    // this has no effect, but if we had a problem with a non-file operation
    // like mounting the tape, then we have to signal the problem to the disk
    // side and the task injector, which will trigger the end of session.
    m_injector->setErrorFlag();
    // We can still update the session stats one last time (unmount timings
    // should have been updated by the RAII cleaner/unmounter).
476
    m_watchdog.updateStatsWithoutDeliveryTime(m_stats);
477
478
479
480
481
482
    
    // If we reached the end of tape, this is not an error (ENOSPC)
    try {
      // If it's not the error we're looking for, we will go about our business
      // in the catch section. dynamic cast will throw, and we'll do ourselves
      // if the error code is not the one we want.
483
484
      const cta::exception::Errnum & en = 
        dynamic_cast<const cta::exception::Errnum &>(e);
485
486
487
488
489
      if(en.errorNumber()!= ENOSPC) {
        throw 0;
      }
      // This is indeed the end of the tape. Not an error.
      m_watchdog.setErrorCount("Info_tapeFilledUp",1);
490
      m_reportPacker.reportTapeFull(m_logContext);
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
    } catch (...) {
      // The error is not an ENOSPC, so it is, indeed, an error.
      // If we got here with a new error, currentErrorToCount will be non-empty,
      // and we will pass the error name to the watchdog.
      if(currentErrorToCount.size()) {
        m_watchdog.addToErrorCount(currentErrorToCount);
      }
    }
    
    //first empty all the tasks and circulate mem blocks
    while(1) {
      std::unique_ptr<TapeWriteTask>  task(m_tasks.pop());
      if(task.get()==NULL) {
        break;
      }
      task->circulateMemBlocks();
    }
    // Prepare the standard error codes for the session
    std::string errorMessage(e.getMessageValue());
510
    int errorCode(666);
511
512
513
    // Override if we got en ENOSPC error (end of tape)
    // This is 
    try {
514
515
      const cta::exception::Errnum & errnum = 
          dynamic_cast<const cta::exception::Errnum &> (e);
516
517
518
519
520
521
      if (ENOSPC == errnum.errorNumber()) {
        errorCode = ENOSPC;
        errorMessage = "End of migration due to tape full";
      }
    } catch (...) {}
    // then log the end of write thread
Victor Kotlyar's avatar
Victor Kotlyar committed
522
    cta::log::ScopedParamContainer params(m_logContext);
523
524
525
    params.add("status", "error")
          .add("ErrorMesage", errorMessage);
    m_stats.totalTime = totalTimer.secs();
Victor Kotlyar's avatar
Victor Kotlyar committed
526
    logWithStats(cta::log::INFO, "Tape thread complete",
527
            params);
528
    m_reportPacker.reportEndOfSessionWithErrors(errorMessage,errorCode, m_logContext);
529
  }      
530
}
531

532
533
534
//------------------------------------------------------------------------------
//logWithStats
//------------------------------------------------------------------------------
535
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::logWithStats(
Victor Kotlyar's avatar
Victor Kotlyar committed
536
int level,const std::string& msg, cta::log::ScopedParamContainer& params){
537
  params.add("type", "write")
538
        .add("tapeVid", m_volInfo.vid)
539
540
541
542
        .add("mountTime", m_stats.mountTime)
        .add("positionTime", m_stats.positionTime)
        .add("waitInstructionsTime", m_stats.waitInstructionsTime)
        .add("checksumingTime", m_stats.checksumingTime)
543
        .add("readWriteTime", m_stats.readWriteTime)
544
545
546
547
548
        .add("waitDataTime", m_stats.waitDataTime)
        .add("waitReportingTime", m_stats.waitReportingTime)
        .add("flushTime", m_stats.flushTime)
        .add("unloadTime", m_stats.unloadTime)
        .add("unmountTime", m_stats.unmountTime)
549
        .add("encryptionControlTime", m_stats.encryptionControlTime)
550
        .add("transferTime", m_stats.transferTime())
551
        .add("totalTime", m_stats.totalTime)
552
553
554
        .add("dataVolume", m_stats.dataVolume)
        .add("headerVolume", m_stats.headerVolume)
        .add("files", m_stats.filesCount)
555
556
557
558
        .add("payloadTransferSpeedMBps", m_stats.totalTime?1.0*m_stats.dataVolume
                /1000/1000/m_stats.totalTime:0.0)
        .add("driveTransferSpeedMBps", m_stats.totalTime?1.0*(m_stats.dataVolume+m_stats.headerVolume)
                /1000/1000/m_stats.totalTime:0.0);
559
  m_logContext.moveToTheEndIfPresent("status");
560
561
  m_logContext.log(level, msg);
}
562
563
564
565
566
567
568
569
570
571

//------------------------------------------------------------------------------
//logSCSIMetrics
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::logSCSIMetrics() {
  try {
    // mount general statistics
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    // get mount general stats
572
    std::map<std::string, uint64_t> scsi_write_metrics_hash = m_drive.getTapeWriteErrors();
573
574
575
576
577
578
579
580
    appendMetricsToScopedParams(scopedContainer, scsi_write_metrics_hash);
    std::map<std::string, uint32_t> scsi_nonmedium_metrics_hash = m_drive.getTapeNonMediumErrors();
    appendMetricsToScopedParams(scopedContainer, scsi_nonmedium_metrics_hash);
    logSCSIStats("Logging mount general statistics",
      scsi_write_metrics_hash.size() + scsi_nonmedium_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
581
    scoped.add("exceptionMessage", ex.getMessageValue());
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
    m_logContext.log(cta::log::ERR, "Exception in logging mount general statistics");
  }

  // drive statistics
  try {
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    // get drive stats
    std::map<std::string,float> scsi_quality_metrics_hash = m_drive.getQualityStats();
    appendMetricsToScopedParams(scopedContainer, scsi_quality_metrics_hash);
    std::map<std::string,uint32_t> scsi_drive_metrics_hash = m_drive.getDriveStats();
    appendMetricsToScopedParams(scopedContainer, scsi_drive_metrics_hash);
    logSCSIStats("Logging drive statistics",
      scsi_quality_metrics_hash.size()+scsi_drive_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
599
    scoped.add("exceptionMessage", ex.getMessageValue());
600
601
602
603
604
605
606
607
608
609
610
611
612
    m_logContext.log(cta::log::ERR, "Exception in logging drive statistics");
  }

  // volume statistics
  try {
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    std::map<std::string,uint32_t> scsi_metrics_hash = m_drive.getVolumeStats();
    appendMetricsToScopedParams(scopedContainer, scsi_metrics_hash);
    logSCSIStats("Logging volume statistics", scsi_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
613
    scoped.add("exceptionMessage", ex.getMessageValue());
614
615
616
    m_logContext.log(cta::log::ERR, "Exception in logging volume statistics");
  }
}