TapeWriteSingleThread.cpp 29 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17
18

#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
19
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
20

David COME's avatar
David COME committed
21
22
23
//------------------------------------------------------------------------------
//constructor
//------------------------------------------------------------------------------
24
castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeWriteSingleThread(
25
castor::tape::tapeserver::drive::DriveInterface & drive, 
26
        cta::mediachanger::MediaChangerFacade & mc,
27
        TapeServerReporter & tsr,
28
        MigrationWatchDog & mwd,
29
        const VolumeInfo& volInfo,
Victor Kotlyar's avatar
Victor Kotlyar committed
30
        cta::log::LogContext & lc,
Daniele Kruse's avatar
Daniele Kruse committed
31
        MigrationReportPacker & repPacker,
32
        cta::server::ProcessCap &capUtils,
33
        uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush,
34
        const bool useLbp, const std::string & externalEncryptionKeyScript,
35
36
        const cta::ArchiveMount & archiveMount,
        const uint64_t tapeLoadTimeout):
37
        TapeSingleThreadInterface<TapeWriteTask>(drive, mc, tsr, volInfo, 
38
          capUtils, lc, externalEncryptionKeyScript,tapeLoadTimeout),
39
40
        m_filesBeforeFlush(filesBeforeFlush),
        m_bytesBeforeFlush(bytesBeforeFlush),
Daniele Kruse's avatar
Daniele Kruse committed
41
42
        m_drive(drive),
        m_reportPacker(repPacker),
43
        m_lastFseq(-1),
44
        m_compress(true),
45
        m_useLbp(useLbp),
46
        m_watchdog(mwd),
47
        m_archiveMount(archiveMount){}
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

//------------------------------------------------------------------------------
//TapeCleaning::~TapeCleaning()
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeCleaning(){
  // Disable encryption (or at least try)
  try {
    if (m_this.m_encryptionControl.disable(m_this.m_drive))
      m_this.m_logContext.log(cta::log::INFO, "Turned encryption off before unmounting");
  } catch (cta::exception::Exception & ex) {
    cta::log::ScopedParamContainer scoped(m_this.m_logContext);
    scoped.add("exceptionError", ex.getMessageValue());
    m_this.m_logContext.log(cta::log::ERR, "Failed to turn off encryption before unmounting");
  }
  m_this.m_stats.encryptionControlTime += m_timer.secs(cta::utils::Timer::resetCounter);
63
  m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::CleaningUp, cta::nullopt, m_this.m_logContext);
64
65
66
67
68
69
70
71
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
  // Log (safely, exception-wise) the tape alerts (if any) at the end of the session
  try { m_this.logTapeAlerts(); } catch (...) {}
  // Log (safely, exception-wise) the tape SCSI metrics at the end of the session
  try { m_this.logSCSIMetrics(); } catch(...) {}
72
73
  m_this.m_initialProcess.reportState(cta::tape::session::SessionState::Unmounting,
      cta::tape::session::SessionType::Archive);
74
75
76
77
78
79
  std::string currentErrorToCount = "Error_tapeUnload";
  try{
    // Do the final cleanup
    // First check that a tape is actually present in the drive. We can get here
    // after failing to mount (library error) in which case there is nothing to
    // do (and trying to unmount will only lead to a failure.)
80
    const uint32_t waitMediaInDriveTimeout = m_this.m_tapeLoadTimeout;
81
82
83
84
85
86
87
88
    try {
      m_this.m_drive.waitUntilReady(waitMediaInDriveTimeout);
    } catch (cta::exception::TimeOut &) {}
    if (!m_this.m_drive.hasTapeInPlace()) {
      m_this.m_logContext.log(cta::log::INFO, "TapeReadSingleThread: No tape to unload");
      goto done;
    }
    // in the special case of a "manual" mode tape, we should skip the unload too.
89
    if (cta::mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.librarySlot().getLibraryType()) {
90
      m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unloading,cta::nullopt, m_this.m_logContext);
91
92
93
94
95
96
97
98
99
100
      m_this.m_drive.unloadTape();
      m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread: Tape unloaded");
    } else {
        m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread: Tape NOT unloaded (manual mode)");
    }
    m_this.m_stats.unloadTime += m_timer.secs(cta::utils::Timer::resetCounter);
    // And return the tape to the library
    // In case of manual mode, this will be filtered by the rmc daemon
    // (which will do nothing)
    currentErrorToCount = "Error_tapeDismount";
101
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, cta::nullopt, m_this.m_logContext);
102
    m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.librarySlot());
103
    m_this.m_drive.disableLogicalBlockProtection();
104
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, cta::nullopt, m_this.m_logContext);
105
    m_this.m_stats.unmountTime += m_timer.secs(cta::utils::Timer::resetCounter);
106
    m_this.m_logContext.log(cta::log::INFO, cta::mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.librarySlot().getLibraryType() ?
107
      "TapeWriteSingleThread : tape unmounted":"TapeWriteSingleThread : tape NOT unmounted (manual mode)");
108
    m_this.m_initialProcess.reportState(cta::tape::session::SessionState::ShuttingDown,
109
110
111
112
113
114
      cta::tape::session::SessionType::Archive);
    m_this.m_stats.waitReportingTime += m_timer.secs(cta::utils::Timer::resetCounter);
  }
  catch(const cta::exception::Exception& ex){
    // Notify something failed during the cleaning 
    m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
115
116
117
118
    const int logLevel = cta::log::ERR;
    const std::string errorMsg = "Exception in TapeWriteSingleThread-TapeCleaning. Putting the drive down.";
    cta::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel,errorMsg);
    m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down,reason, m_this.m_logContext);
119
    cta::log::ScopedParamContainer scoped(m_this.m_logContext);
120
    scoped.add("exceptionMessage", ex.getMessageValue());
121
    m_this.m_logContext.log(logLevel, errorMsg);
122
123
124
125
126
127
128
129
130
    // As we do not throw exceptions from here, the watchdog signalling has
    // to occur from here.
    try {
      if (currentErrorToCount.size()) {
        m_this.m_watchdog.addToErrorCount(currentErrorToCount);
      }
    } catch (...) {}
  } catch (...) {
     // Notify something failed during the cleaning 
131
132
133
    const int logLevel = cta::log::ERR;
    const std::string errorMsg = "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape. Putting the drive down.";
    cta::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel,errorMsg);
134
     m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
135
136
     m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down,reason,m_this.m_logContext);
     m_this.m_logContext.log(logLevel,errorMsg);
137
138
139
140
141
142
143
144
145
146
147
     try {
       if (currentErrorToCount.size()) {
         m_this.m_watchdog.addToErrorCount(currentErrorToCount);
       }
     } catch (...) {}
  }
  done:
    //then we terminate the global status reporter
    m_this.m_initialProcess.finish();
}

David COME's avatar
David COME committed
148
149
150
//------------------------------------------------------------------------------
//setlastFseq
//------------------------------------------------------------------------------
151
152
153
154
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
setlastFseq(uint64_t lastFseq){
  m_lastFseq=lastFseq;
}
David COME's avatar
David COME committed
155
156
157
//------------------------------------------------------------------------------
//openWriteSession
//------------------------------------------------------------------------------
158
std::unique_ptr<castor::tape::tapeFile::WriteSession> 
159
castor::tape::tapeserver::daemon::TapeWriteSingleThread::openWriteSession() {
Victor Kotlyar's avatar
Victor Kotlyar committed
160
161
  using cta::log::LogContext;
  using cta::log::Param;
162
163
  typedef LogContext::ScopedParam ScopedParam;
  
164
  std::unique_ptr<castor::tape::tapeFile::WriteSession> writeSession;
165
166
167
  
  ScopedParam sp[]={
    ScopedParam(m_logContext, Param("lastFseq", m_lastFseq)),
168
    ScopedParam(m_logContext, Param("compression", m_compress)),
169
    ScopedParam(m_logContext, Param("useLbp", m_useLbp)),
170
171
172
173
  };
  tape::utils::suppresUnusedVariable(sp);
  try {
    writeSession.reset(
174
175
    new castor::tape::tapeFile::WriteSession(m_drive, m_volInfo, m_lastFseq,
      m_compress, m_useLbp)
176
177
    );
  }
178
  catch (cta::exception::Exception & e) {
179
    ScopedParam sp0(m_logContext, Param("ErrorMessage", e.getMessageValue()));
Victor Kotlyar's avatar
Victor Kotlyar committed
180
    m_logContext.log(cta::log::ERR, "Failed to start tape write session");
181
182
183
184
185
186
    // TODO: log and unroll the session
    // TODO: add an unroll mode to the tape read task. (Similar to exec, but pushing blocks marked in error)
    throw;
  }
  return writeSession;
}
David COME's avatar
David COME committed
187
188
189
//------------------------------------------------------------------------------
//tapeFlush
//------------------------------------------------------------------------------
190
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
Eric Cano's avatar
Eric Cano committed
191
tapeFlush(const std::string& message,uint64_t bytes,uint64_t files,
Victor Kotlyar's avatar
Victor Kotlyar committed
192
  cta::utils::Timer & timer)
Eric Cano's avatar
Eric Cano committed
193
{
194
  m_drive.flush();
Victor Kotlyar's avatar
Victor Kotlyar committed
195
  double flushTime = timer.secs(cta::utils::Timer::resetCounter);
Victor Kotlyar's avatar
Victor Kotlyar committed
196
  cta::log::ScopedParamContainer params(m_logContext);
Eric Cano's avatar
Eric Cano committed
197
198
  params.add("files", files)
        .add("bytes", bytes)
199
        .add("flushTime", flushTime);
Victor Kotlyar's avatar
Victor Kotlyar committed
200
  m_logContext.log(cta::log::INFO,message);
Eric Cano's avatar
Eric Cano committed
201
  m_stats.flushTime += flushTime;
202
203
  

204
  m_reportPacker.reportFlush(m_drive.getCompression(), m_logContext);
205
  m_drive.clearCompressionStats();
206
}
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237

//------------------------------------------------------------------------
//   logAndCheckTapeAlertsForWrite
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::TapeWriteSingleThread::
logAndCheckTapeAlertsForWrite() {
  std::vector<uint16_t> tapeAlertCodes = m_drive.getTapeAlertCodes();
  if (tapeAlertCodes.empty()) return false;
  size_t alertNumber = 0;
  // Log tape alerts in the logs.
  std::vector<std::string> tapeAlerts = m_drive.getTapeAlerts(tapeAlertCodes);
  for (std::vector<std::string>::iterator ta=tapeAlerts.begin();
          ta!=tapeAlerts.end();ta++)
  {
    cta::log::ScopedParamContainer params(m_logContext);
    params.add("tapeAlert",*ta)
          .add("tapeAlertNumber", alertNumber++)
          .add("tapeAlertCount", tapeAlerts.size());
    m_logContext.log(cta::log::WARNING, "Tape alert detected");
  }
  // Add tape alerts in the tape log parameters
  std::vector<std::string> tapeAlertsCompact = 
    m_drive.getTapeAlertsCompact(tapeAlertCodes);
  for (std::vector<std::string>::iterator tac=tapeAlertsCompact.begin();
          tac!=tapeAlertsCompact.end();tac++)
  {
    countTapeLogError(std::string("Error_")+*tac);
  }
  return(m_drive.tapeAlertsCriticalForWrite(tapeAlertCodes));
}

238
239
240
241
242
243
244
//------------------------------------------------------------------------------
//   isTapeWritable
//-----------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::
isTapeWritable() const {
// check that drive is not write protected
      if(m_drive.isWriteProtected()) {   
245
        cta::exception::Exception ex;
246
247
248
249
250
        ex.getMessage() <<
                "End session with error. Drive is write protected. Aborting labelling...";
        throw ex;
      }
}
251
252
253
254
255

//-----------------------------------------------------------------------------
// volumeModeToString
//-----------------------------------------------------------------------------
const char *castor::tape::tapeserver::daemon::TapeWriteSingleThread::
256
  mountTypeToString(const cta::common::dataStructures::MountType mountType) const throw() {
257
  switch(mountType) {
258
  case cta::common::dataStructures::MountType::Retrieve: return "Retrieve";
259
260
  case cta::common::dataStructures::MountType::ArchiveForUser: return "ArchiveForUser";
  case cta::common::dataStructures::MountType::ArchiveForRepack: return "ArchiveForRepack";
261
  case cta::common::dataStructures::MountType::Label: return "Label";
262
263
264
265
  default                      : return "UNKNOWN";
  }
}

266
//------------------------------------------------------------------------------
David COME's avatar
David COME committed
267
268
269
//------------------------------------------------------------------------------
//run
//------------------------------------------------------------------------------
270
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
Victor Kotlyar's avatar
Victor Kotlyar committed
271
  cta::log::ScopedParamContainer threadGlobalParams(m_logContext);
272
  threadGlobalParams.add("thread", "TapeWrite");
Victor Kotlyar's avatar
Victor Kotlyar committed
273
  cta::utils::Timer timer, totalTimer;
274
275
276
277
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
278
  std::string currentErrorToCount = "Error_tapeMountForWrite";
279
280
  try
  {
281
    // Report the parameters of the session to the main thread
Victor Kotlyar's avatar
Victor Kotlyar committed
282
    typedef cta::log::Param Param;
283
    m_watchdog.addParameter(Param("tapeVid", m_volInfo.vid));
284
    m_watchdog.addParameter(Param("mountType", mountTypeToString(m_volInfo.mountType)));
285
286
    m_watchdog.addParameter(Param("mountId", m_volInfo.mountId));
    m_watchdog.addParameter(Param("volReqId", m_volInfo.mountId));
287
    
288
289
    m_watchdog.addParameter(Param("tapeDrive",m_drive.config.unitName));
    m_watchdog.addParameter(Param("vendor",m_archiveMount.getVendor()));
290
291
292
293
    m_watchdog.addParameter(Param("vo",m_archiveMount.getVo()));
    m_watchdog.addParameter(Param("mediaType",m_archiveMount.getMediaType()));
    m_watchdog.addParameter(Param("tapePool",m_archiveMount.getPoolName()));
    m_watchdog.addParameter(Param("logicalLibrary",m_drive.config.logicalLibrary));
294
295
    m_watchdog.addParameter(Param("capacityInBytes",m_archiveMount.getCapacityInBytes()));
      
296
297
298
299
    // Set the tape thread time in the watchdog for total time estimation in case
    // of crash
    m_watchdog.updateThreadTimer(totalTimer);
    
300
    //pair of brackets to create an artificial scope for the tape cleaning
Eric Cano's avatar
Eric Cano committed
301
302
    {
      //log and notify
Victor Kotlyar's avatar
Victor Kotlyar committed
303
      m_logContext.log(cta::log::INFO, "Starting tape write thread");
304
      
Eric Cano's avatar
Eric Cano committed
305
306
307
      // The tape will be loaded 
      // it has to be unloaded, unmounted at all cost -> RAII
      // will also take care of the TapeServerReporter
308
      // 
Eric Cano's avatar
Eric Cano committed
309
      TapeCleaning cleaner(*this, timer);
310
      m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Mounting,cta::nullopt, m_logContext);
Eric Cano's avatar
Eric Cano committed
311
312
      // Before anything, the tape should be mounted
      // This call does the logging of the mount
313
314
315
316
317
318
319
320
321
      cta::log::ScopedParamContainer params(m_logContext);
      params.add("vo",m_archiveMount.getVo());
      params.add("mediaType",m_archiveMount.getMediaType());
      params.add("tapePool",m_archiveMount.getPoolName());
      params.add("logicalLibrary",m_drive.config.logicalLibrary);
      params.add("mountType",mountTypeToString(m_volInfo.mountType));
      params.add("vendor",m_archiveMount.getVendor());
      params.add("capacityInBytes",m_archiveMount.getCapacityInBytes());
      m_logContext.log(cta::log::INFO, "Tape session started");
322
      mountTapeReadWrite();
323
      currentErrorToCount = "Error_tapeLoad";
324
      cta::utils::Timer tapeLoadTimer;
Eric Cano's avatar
Eric Cano committed
325
      waitForDrive();
326
      double tapeLoadTime = tapeLoadTimer.secs();
327
      currentErrorToCount = "Error_checkingTapeAlert";
328
329
330
      if(logAndCheckTapeAlertsForWrite()) {
        throw cta::exception::Exception("Aborting migration session in"
          " presence of critical tape alerts");
331
      }
332

333
      currentErrorToCount = "Error_tapeNotWriteable";
334
335
      isTapeWritable();
      
Victor Kotlyar's avatar
Victor Kotlyar committed
336
      m_stats.mountTime += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
337
      {
Victor Kotlyar's avatar
Victor Kotlyar committed
338
        cta::log::ScopedParamContainer scoped(m_logContext);
339
        scoped.add("mountTime", m_stats.mountTime);
340
        scoped.add("tapeLoadTime",tapeLoadTime);
Victor Kotlyar's avatar
Victor Kotlyar committed
341
        m_logContext.log(cta::log::INFO, "Tape mounted and drive ready");
342
      }
343
      m_archiveMount.setTapeMounted(m_logContext);
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
      try {
        currentErrorToCount = "Error_tapeEncryptionEnable";
        // We want those scoped params to last for the whole mount.
        // This will allow each written file to be logged with its encryption
        // status:
        cta::log::ScopedParamContainer encryptionLogParams(m_logContext);
        {
          auto encryptionStatus = m_encryptionControl.enable(m_drive, m_volInfo.vid,
                                                             EncryptionControl::SetTag::SET_TAG);
 
          if (encryptionStatus.on) {
            encryptionLogParams.add("encryption", "on")
              .add("encryptionKey", encryptionStatus.keyName)
              .add("stdout", encryptionStatus.stdout);
            m_logContext.log(cta::log::INFO, "Drive encryption enabled for this mount");
          } else {
            encryptionLogParams.add("encryption", "off");
            m_logContext.log(cta::log::INFO, "Drive encryption not enabled for this mount");
          }
        }
        m_stats.encryptionControlTime += timer.secs(cta::utils::Timer::resetCounter);
      }
366
      catch (cta::exception::Exception &ex) {
367
368
369
370
371
        cta::log::ScopedParamContainer params(m_logContext);
        params.add("ErrorMessage", ex.getMessage().str());
        m_logContext.log(cta::log::ERR, "Drive encryption could not be enabled for this mount.");
        throw;
      }
372
      currentErrorToCount = "Error_tapePositionForWrite";
373
      // Then we have to initialize the tape write session
374
      std::unique_ptr<castor::tape::tapeFile::WriteSession> writeSession(openWriteSession());
Victor Kotlyar's avatar
Victor Kotlyar committed
375
      m_stats.positionTime  += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
376
      {
Victor Kotlyar's avatar
Victor Kotlyar committed
377
        cta::log::ScopedParamContainer scoped(m_logContext);
378
        scoped.add("positionTime", m_stats.positionTime);
379
380
381
382
        scoped.add("useLbp", m_useLbp);
        scoped.add("detectedLbp", writeSession->isTapeWithLbp());

        if (!writeSession->isTapeWithLbp() && m_useLbp) {
Victor Kotlyar's avatar
Victor Kotlyar committed
383
          m_logContext.log(cta::log::INFO, "Tapserver started with LBP support but "
384
385
386
387
            "the tape without LBP label mounted");
        }
        switch(m_drive.getLbpToUse()) {
          case drive::lbpToUse::crc32cReadWrite:
Victor Kotlyar's avatar
Victor Kotlyar committed
388
            m_logContext.log(cta::log::INFO, "Write session initialised with LBP"
389
390
391
392
              " crc32c in ReadWrite mode, tape VID checked and drive positioned"
              " for writing");
            break;
          case drive::lbpToUse::disabled:
Victor Kotlyar's avatar
Victor Kotlyar committed
393
            m_logContext.log(cta::log::INFO, "Write session initialised without LBP"
394
395
396
              ", tape VID checked and drive positioned for writing");
            break;
          default:
Victor Kotlyar's avatar
Victor Kotlyar committed
397
            m_logContext.log(cta::log::ERR, "Write session initialised with "
398
399
400
              "unsupported LBP method, tape VID checked and drive positioned"
              " for writing");
        }
401
      }
402

403
404
      m_initialProcess.reportState(cta::tape::session::SessionState::Running,
        cta::tape::session::SessionType::Archive);
Eric Cano's avatar
Eric Cano committed
405
406
      uint64_t bytes=0;
      uint64_t files=0;
Victor Kotlyar's avatar
Victor Kotlyar committed
407
      m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
408
409
      // Tasks handle their error logging themselves.
      currentErrorToCount = "";
Daniele Kruse's avatar
Daniele Kruse committed
410
      std::unique_ptr<TapeWriteTask> task;   
411
      m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring,cta::nullopt, m_logContext); 
Eric Cano's avatar
Eric Cano committed
412
413
414
      while(1) {
        //get a task
        task.reset(m_tasks.pop());
Victor Kotlyar's avatar
Victor Kotlyar committed
415
        m_stats.waitInstructionsTime += timer.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
416
417
418
419
        //if is the end
        if(NULL==task.get()) {      
          //we flush without asking
          tapeFlush("No more data to write on tape, unconditional flushing to the client",bytes,files,timer);
Victor Kotlyar's avatar
Victor Kotlyar committed
420
          m_stats.flushTime += timer.secs(cta::utils::Timer::resetCounter);
Victor Kotlyar's avatar
Victor Kotlyar committed
421
422
          cta::log::LogContext::ScopedParam sp0(m_logContext, cta::log::Param("tapeThreadDuration", totalTimer.secs()));
          m_logContext.log(cta::log::DEBUG, "writing data to tape has finished");
Eric Cano's avatar
Eric Cano committed
423
424
          break;
        }
425
        task->execute(*writeSession,m_reportPacker,m_watchdog,m_logContext,timer);
426
427
        // Add the tasks counts to the session's
        m_stats.add(task->getTaskStats());
428
        // Transmit the statistics to the watchdog thread
429
        m_watchdog.updateStatsWithoutDeliveryTime(m_stats);
Eric Cano's avatar
Eric Cano committed
430
431
432
433
434
435
        // Increase local flush counters (session counters are incremented by
        // the task)
        files++;
        bytes+=task->fileSize();
        //if one flush counter is above a threshold, then we flush
        if (files >= m_filesBeforeFlush || bytes >= m_bytesBeforeFlush) {
436
          currentErrorToCount = "Error_tapeFlush";
Eric Cano's avatar
Eric Cano committed
437
438
439
          tapeFlush("Normal flush because thresholds was reached",bytes,files,timer);
          files=0;
          bytes=0;
440
          currentErrorToCount = "";
Eric Cano's avatar
Eric Cano committed
441
442
443
        }
      } //end of while(1))
    }
444

Eric Cano's avatar
Eric Cano committed
445
446
    // The session completed successfully, and the cleaner (unmount) executed
    // at the end of the previous block. Log the results.
Victor Kotlyar's avatar
Victor Kotlyar committed
447
    cta::log::ScopedParamContainer params(m_logContext);
448
    params.add("status", "success");
449
    m_stats.totalTime = totalTimer.secs();
450
    m_stats.deliveryTime = m_stats.totalTime;
Victor Kotlyar's avatar
Victor Kotlyar committed
451
    logWithStats(cta::log::INFO, "Tape thread complete",params);
452
453
    // Report one last time the stats, after unloading/unmounting.
    m_watchdog.updateStats(m_stats);
454
455
    //end of session + log
    m_reportPacker.reportEndOfSession(m_logContext);
456
  } //end of try 
457
458
459
460
461
462
463
464
465
466
467
468
469
  catch(const cta::exception::Exception& e){
    //we end there because write session could not be opened 
    //or because a task failed or because flush failed
    
    // First off, indicate the problem to the task injector so it does not inject
    // more work in the pipeline
    // If the problem did not originate here, we just re-flag the error, and
    // this has no effect, but if we had a problem with a non-file operation
    // like mounting the tape, then we have to signal the problem to the disk
    // side and the task injector, which will trigger the end of session.
    m_injector->setErrorFlag();
    // We can still update the session stats one last time (unmount timings
    // should have been updated by the RAII cleaner/unmounter).
470
    m_watchdog.updateStatsWithoutDeliveryTime(m_stats);
471
472
473
474
475
476
    
    // If we reached the end of tape, this is not an error (ENOSPC)
    try {
      // If it's not the error we're looking for, we will go about our business
      // in the catch section. dynamic cast will throw, and we'll do ourselves
      // if the error code is not the one we want.
477
478
      const cta::exception::Errnum & en = 
        dynamic_cast<const cta::exception::Errnum &>(e);
479
480
481
482
483
      if(en.errorNumber()!= ENOSPC) {
        throw 0;
      }
      // This is indeed the end of the tape. Not an error.
      m_watchdog.setErrorCount("Info_tapeFilledUp",1);
484
      m_reportPacker.reportTapeFull(m_logContext);
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
    } catch (...) {
      // The error is not an ENOSPC, so it is, indeed, an error.
      // If we got here with a new error, currentErrorToCount will be non-empty,
      // and we will pass the error name to the watchdog.
      if(currentErrorToCount.size()) {
        m_watchdog.addToErrorCount(currentErrorToCount);
      }
    }
    
    //first empty all the tasks and circulate mem blocks
    while(1) {
      std::unique_ptr<TapeWriteTask>  task(m_tasks.pop());
      if(task.get()==NULL) {
        break;
      }
      task->circulateMemBlocks();
    }
    // Prepare the standard error codes for the session
    std::string errorMessage(e.getMessageValue());
504
    int errorCode(666);
505
506
507
    // Override if we got en ENOSPC error (end of tape)
    // This is 
    try {
508
509
      const cta::exception::Errnum & errnum = 
          dynamic_cast<const cta::exception::Errnum &> (e);
510
511
512
513
514
515
      if (ENOSPC == errnum.errorNumber()) {
        errorCode = ENOSPC;
        errorMessage = "End of migration due to tape full";
      }
    } catch (...) {}
    // then log the end of write thread
Victor Kotlyar's avatar
Victor Kotlyar committed
516
    cta::log::ScopedParamContainer params(m_logContext);
517
518
519
    params.add("status", "error")
          .add("ErrorMesage", errorMessage);
    m_stats.totalTime = totalTimer.secs();
Victor Kotlyar's avatar
Victor Kotlyar committed
520
    logWithStats(cta::log::INFO, "Tape thread complete",
521
            params);
522
    m_reportPacker.reportEndOfSessionWithErrors(errorMessage,errorCode, m_logContext);
523
  }      
524
}
525

526
527
528
//------------------------------------------------------------------------------
//logWithStats
//------------------------------------------------------------------------------
529
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::logWithStats(
Victor Kotlyar's avatar
Victor Kotlyar committed
530
int level,const std::string& msg, cta::log::ScopedParamContainer& params){
531
  params.add("type", "write")
532
        .add("tapeVid", m_volInfo.vid)
533
534
535
536
        .add("mountTime", m_stats.mountTime)
        .add("positionTime", m_stats.positionTime)
        .add("waitInstructionsTime", m_stats.waitInstructionsTime)
        .add("checksumingTime", m_stats.checksumingTime)
537
        .add("readWriteTime", m_stats.readWriteTime)
538
539
540
541
542
        .add("waitDataTime", m_stats.waitDataTime)
        .add("waitReportingTime", m_stats.waitReportingTime)
        .add("flushTime", m_stats.flushTime)
        .add("unloadTime", m_stats.unloadTime)
        .add("unmountTime", m_stats.unmountTime)
543
        .add("encryptionControlTime", m_stats.encryptionControlTime)
544
        .add("transferTime", m_stats.transferTime())
545
        .add("totalTime", m_stats.totalTime)
546
547
548
        .add("dataVolume", m_stats.dataVolume)
        .add("headerVolume", m_stats.headerVolume)
        .add("files", m_stats.filesCount)
549
550
551
552
        .add("payloadTransferSpeedMBps", m_stats.totalTime?1.0*m_stats.dataVolume
                /1000/1000/m_stats.totalTime:0.0)
        .add("driveTransferSpeedMBps", m_stats.totalTime?1.0*(m_stats.dataVolume+m_stats.headerVolume)
                /1000/1000/m_stats.totalTime:0.0);
553
  m_logContext.moveToTheEndIfPresent("status");
554
555
  m_logContext.log(level, msg);
}
556
557
558
559
560
561
562
563
564
565

//------------------------------------------------------------------------------
//logSCSIMetrics
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeWriteSingleThread::logSCSIMetrics() {
  try {
    // mount general statistics
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    // get mount general stats
566
    std::map<std::string, uint64_t> scsi_write_metrics_hash = m_drive.getTapeWriteErrors();
567
568
569
570
571
572
573
574
    appendMetricsToScopedParams(scopedContainer, scsi_write_metrics_hash);
    std::map<std::string, uint32_t> scsi_nonmedium_metrics_hash = m_drive.getTapeNonMediumErrors();
    appendMetricsToScopedParams(scopedContainer, scsi_nonmedium_metrics_hash);
    logSCSIStats("Logging mount general statistics",
      scsi_write_metrics_hash.size() + scsi_nonmedium_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
575
    scoped.add("exceptionMessage", ex.getMessageValue());
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
    m_logContext.log(cta::log::ERR, "Exception in logging mount general statistics");
  }

  // drive statistics
  try {
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    // get drive stats
    std::map<std::string,float> scsi_quality_metrics_hash = m_drive.getQualityStats();
    appendMetricsToScopedParams(scopedContainer, scsi_quality_metrics_hash);
    std::map<std::string,uint32_t> scsi_drive_metrics_hash = m_drive.getDriveStats();
    appendMetricsToScopedParams(scopedContainer, scsi_drive_metrics_hash);
    logSCSIStats("Logging drive statistics",
      scsi_quality_metrics_hash.size()+scsi_drive_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
593
    scoped.add("exceptionMessage", ex.getMessageValue());
594
595
596
597
598
599
600
601
602
603
604
605
606
    m_logContext.log(cta::log::ERR, "Exception in logging drive statistics");
  }

  // volume statistics
  try {
    cta::log::ScopedParamContainer scopedContainer(m_logContext);
    appendDriveAndTapeInfoToScopedParams(scopedContainer);
    std::map<std::string,uint32_t> scsi_metrics_hash = m_drive.getVolumeStats();
    appendMetricsToScopedParams(scopedContainer, scsi_metrics_hash);
    logSCSIStats("Logging volume statistics", scsi_metrics_hash.size());
  }
  catch (const cta::exception::Exception &ex) {
    cta::log::ScopedParamContainer scoped(m_logContext);
607
    scoped.add("exceptionMessage", ex.getMessageValue());
608
609
610
    m_logContext.log(cta::log::ERR, "Exception in logging volume statistics");
  }
}