TapeWriteTask.cpp 17.7 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17

18
#include "common/exception/Errnum.hpp"
19
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
David COME's avatar
David COME committed
20
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
21
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
22
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
23
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
24
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
25
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
26
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
27
#include "castor/tape/tapeserver/daemon/ErrorFlag.hpp"
28
29
#include "castor/tape/tapeserver/file/File.hpp" 
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
30
#include "common/exception/Exception.hpp"
31

32
33
34
35
36
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {

37
38
39
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
40
  TapeWriteTask::TapeWriteTask(int blockCount, cta::ArchiveJob *archiveJob,
41
          MigrationMemoryManager& mm,cta::threading::AtomicFlag& errorFlag): 
42
43
    m_archiveJob(archiveJob),m_memManager(mm), m_fifo(blockCount),
    m_blockCount(blockCount),m_errorFlag(errorFlag), 
44
    m_archiveFile(m_archiveJob->archiveFile), m_tapeFile(m_archiveJob->tapeFile),
45
    m_srcURL(m_archiveJob->srcURL)
46
  {
47
    //register its fifo to the memory manager as a client in order to get mem block
48
49
    // This should not be done in the case of a zero length file.
    if (archiveJob->archiveFile.fileSize) mm.addClient(&m_fifo);
50
  }
51
52
53
//------------------------------------------------------------------------------
// fileSize
//------------------------------------------------------------------------------
54
  uint64_t TapeWriteTask::fileSize() { 
55
    return m_archiveFile.fileSize; 
56
  }
57
58
59
//------------------------------------------------------------------------------
// execute
//------------------------------------------------------------------------------  
60
   void TapeWriteTask::execute(castor::tape::tapeFile::WriteSession & session,
61
           MigrationReportPacker & reportPacker, MigrationWatchDog & watchdog,
Victor Kotlyar's avatar
Victor Kotlyar committed
62
63
64
65
           cta::log::LogContext&  lc, cta::utils::Timer & timer) {
    using cta::log::LogContext;
    using cta::log::Param;
    using cta::log::ScopedParamContainer;
66
67
    // Add to our logs the informations on the file
    ScopedParamContainer params(lc);
68
69
70
    params.add("fileId",m_archiveJob->archiveFile.archiveFileID)
          .add("fileSize",m_archiveJob->archiveFile.fileSize)
          .add("fSeq",m_archiveJob->tapeFile.fSeq)
71
          .add("diskURL",m_archiveJob->srcURL);
72
    
73
74
    // We will clock the stats for the file itself, and eventually add those
    // stats to the session's.
Victor Kotlyar's avatar
Victor Kotlyar committed
75
    cta::utils::Timer localTime;
David COME's avatar
David COME committed
76
    unsigned long ckSum = Payload::zeroAdler32();
77
    uint32_t memBlockId  = 0;
78
    
79
80
81
82
    // This out-of-try-catch variables allows us to record the stage of the 
    // process we're in, and to count the error if it occurs.
    // We will not record errors for an empty string. This will allow us to
    // prevent counting where error happened upstream.
83
    std::string currentErrorToCount = "Error_tapeFSeqOutOfSequenceForWrite";
84
    session.validateNextFSeq(m_archiveJob->tapeFile.fSeq);
David COME's avatar
David COME committed
85
    try {
86
      //try to open the session
87
      currentErrorToCount = "Error_tapeWriteHeader";
88
      watchdog.notifyBeginNewJob(m_archiveJob->archiveFile.archiveFileID, m_archiveJob->tapeFile.fSeq);
89
      std::unique_ptr<castor::tape::tapeFile::WriteFile> output(openWriteFile(session,lc));
90
      m_LBPMode = output->getLBPMode();
Victor Kotlyar's avatar
Victor Kotlyar committed
91
      m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
92
      m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile;
93
94
      // We are not error sources here until we actually write.
      currentErrorToCount = "";
95
      bool firstBlock = true;
David COME's avatar
David COME committed
96
      while(!m_fifo.finished()) {
Eric Cano's avatar
WIP    
Eric Cano committed
97
        MemBlock* const mb = m_fifo.popDataBlock();
Victor Kotlyar's avatar
Victor Kotlyar committed
98
        m_taskStats.waitDataTime += timer.secs(cta::utils::Timer::resetCounter);
99
        AutoReleaseBlock<MigrationMemoryManager> releaser(mb,m_memManager);
100
        
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
        // Special treatment for 1st block. If disk failed to provide anything, we can skip the file
        // by leaving a placeholder on the tape (at minimal tape space cost), so we can continue
        // the tape session (and save a tape mount!).
        if (firstBlock && mb->isFailed()) {
          currentErrorToCount = "Error_tapeWriteData";
          const char blank[]="This file intentionally left blank: leaving placeholder after failing to read from disk.";
          output->write(blank, sizeof(blank));
          m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
          watchdog.notify(sizeof(blank));
          currentErrorToCount = "Error_tapeWriteTrailer";
          output->close();
          currentErrorToCount = "";
          // Possibly failing writes are finished. We can continue this in catch for skip. outside of the loop.
          throw Skip(mb->errorMsg());
        }
        firstBlock = false;
        
118
        //will throw (thus exiting the loop) if something is wrong
119
        checkErrors(mb,memBlockId,lc);
120
        
121
        ckSum =  mb->m_payload.adler32(ckSum);
Victor Kotlyar's avatar
Victor Kotlyar committed
122
        m_taskStats.checksumingTime += timer.secs(cta::utils::Timer::resetCounter);
123
        currentErrorToCount = "Error_tapeWriteData";
124
        mb->m_payload.write(*output);
125
        currentErrorToCount = "";
126
        
Victor Kotlyar's avatar
Victor Kotlyar committed
127
        m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
128
        m_taskStats.dataVolume += mb->m_payload.size();
129
        watchdog.notify(mb->m_payload.size());
130
        ++memBlockId;
131
      }
132
      
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
      // If, after the FIFO is finished, we are still in the first block, we are in the presence of a 0-length file.
      // This also requires a placeholder.
      if (firstBlock) {
        currentErrorToCount = "Error_tapeWriteData";
        const char blank[]="This file intentionally left blank: zero-length file cannot be recorded to tape.";
        output->write(blank, sizeof(blank));
        m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
        watchdog.notify(sizeof(blank));
        currentErrorToCount = "Error_tapeWriteTrailer";
        output->close();
        currentErrorToCount = "";
        // Possibly failing writes are finished. We can continue this in catch for skip. outside of the loop.
        throw Skip("In TapeWriteTask::execute(): inserted a placeholder for zero length file.");
      }
      
148
149
      //finish the writing of the file on tape
      //put the trailer
150
      currentErrorToCount = "Error_tapeWriteTrailer";
151
      output->close();
152
      currentErrorToCount = "";
Victor Kotlyar's avatar
Victor Kotlyar committed
153
      m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
154
      m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile;
155
      m_taskStats.filesCount ++;
156
      // Record the fSeq in the tape session
157
      session.reportWrittenFSeq(m_archiveJob->tapeFile.fSeq);
158
      m_archiveJob->tapeFile.checksumBlob.insert(cta::checksum::ADLER32, ckSum);
159
      m_archiveJob->tapeFile.fileSize = m_taskStats.dataVolume;
160
      m_archiveJob->tapeFile.blockId = output->getBlockId();
161
      reportPacker.reportCompletedJob(std::move(m_archiveJob), lc);
Victor Kotlyar's avatar
Victor Kotlyar committed
162
      m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
163
      m_taskStats.totalTime = localTime.secs();
164
      // Log the successful transfer      
Victor Kotlyar's avatar
Victor Kotlyar committed
165
      logWithStats(cta::log::INFO, "File successfully transmitted to drive",lc);
166
    } 
167
    catch(const castor::tape::tapeserver::daemon::ErrorFlag&){
168
169
      // We end up there because another task has failed 
      // so we just log, circulate blocks and don't even send a report 
Victor Kotlyar's avatar
Victor Kotlyar committed
170
      lc.log(cta::log::DEBUG,"TapeWriteTask: a previous file has failed for migration "
171
172
      "Do nothing except circulating blocks");
      circulateMemBlocks();
173
      
174
175
      // We throw again because we want TWST to stop all tasks from execution 
      // and go into a degraded mode operation.
176
      throw;
177
    }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
    catch(const Skip& s) {
      // We failed to read anything from the file. We can get rid of any block from the queue to
      // recycle them, and pass the report to the report packer. After than, we can carry on with 
      // the write session.
      circulateMemBlocks();
      watchdog.addToErrorCount("Info_fileSkipped");
      m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
      m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile;
      m_taskStats.filesCount ++;
      // Record the fSeq in the tape session
      session.reportWrittenFSeq(m_archiveJob->tapeFile.fSeq);
      reportPacker.reportSkippedJob(std::move(m_archiveJob), s, lc);
      m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
      m_taskStats.totalTime = localTime.secs();
      // Log the successful transfer      
      logWithStats(cta::log::INFO, "Left placeholder on tape after skipping unreadable file.", lc);
    } catch(const cta::exception::Exception& e){
195
196
197
      //we can end up there because
      //we failed to open the WriteFile
      //we received a bad block or a block written failed
198
199
      //close failed
      
200
      //first set the error flag: we can't proceed any further with writes.
201
      m_errorFlag.set();
202
      
203
204
205
206
207
      // If we reached the end of tape, this is not an error (ENOSPC)
      try {
        // If it's not the error we're looking for, we will go about our business
        // in the catch section. dynamic cast will throw, and we'll do ourselves
        // if the error code is not the one we want.
208
209
        const cta::exception::Errnum & en = 
          dynamic_cast<const cta::exception::Errnum &>(e);
210
211
212
213
        if(en.errorNumber()!= ENOSPC) {
          throw 0;
        }
        // This is indeed the end of the tape. Not an error.
214
        watchdog.setErrorCount("Info_tapeFilledUp",1);
215
        reportPacker.reportTapeFull(lc);
216
217
218
219
220
221
222
      } catch (...) {
        // The error is not an ENOSPC, so it is, indeed, an error.
        // If we got here with a new error, currentErrorToCount will be non-empty,
        // and we will pass the error name to the watchdog.
        if(currentErrorToCount.size()) {
          watchdog.addToErrorCount(currentErrorToCount);
        }
223
      }
224
225

      //log and circulate blocks
226
      // We want to report internal error most of the time to avoid wrong
227
228
229
      // interpretation down the chain. Nevertheless, if the exception
      // if of type Errnum AND the errorCode is ENOSPC, we will propagate it.
      // This is how we communicate the fact that a tape is full to the client.
230
      // We also change the log level to INFO for the case of end of tape.
231
      int errorCode = 666; // TODO - Remove error code
Victor Kotlyar's avatar
Victor Kotlyar committed
232
      int errorLevel = cta::log::ERR;
233
      bool doReportJobError = true;
234
      try {
235
236
        const cta::exception::Errnum & errnum = 
            dynamic_cast<const cta::exception::Errnum &> (e);
237
        if (ENOSPC == errnum.errorNumber()) {
238
          errorCode = ENOSPC;
Victor Kotlyar's avatar
Victor Kotlyar committed
239
          errorLevel = cta::log::INFO;
240
          doReportJobError = false;
241
        }
242
243
      } catch (...) {}
      LogContext::ScopedParam sp(lc, Param("exceptionCode",errorCode));
244
      LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
245
      lc.log(errorLevel,"An error occurred for this file. End of migrations.");
246
      circulateMemBlocks();
247
      if (doReportJobError) reportPacker.reportFailedJob(std::move(m_archiveJob),e, lc);
248
249
250
  
      //we throw again because we want TWST to stop all tasks from execution 
      //and go into a degraded mode operation.
251
      throw;
252
253
    }
    watchdog.fileFinished();
David COME's avatar
David COME committed
254
   }
255
256
257
//------------------------------------------------------------------------------
// getFreeBlock
//------------------------------------------------------------------------------    
258
259
260
  MemBlock * TapeWriteTask::getFreeBlock() { 
    return m_fifo.getFreeBlock(); 
  }
261
262
263
//------------------------------------------------------------------------------
// checkErrors
//------------------------------------------------------------------------------  
Victor Kotlyar's avatar
Victor Kotlyar committed
264
265
  void TapeWriteTask::checkErrors(MemBlock* mb,int memBlockId,cta::log::LogContext&  lc){
    using namespace cta::log;
266
    if(m_archiveJob->archiveFile.archiveFileID != mb->m_fileid
267
268
269
            || memBlockId != mb->m_fileBlock
            || mb->isFailed()
            || mb->isCanceled()) {
270
      LogContext::ScopedParam sp[]={
271
272
273
        LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)),
        LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", memBlockId)),
        LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)),
274
275
276
277
278
279
        LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed()))
      };
      tape::utils::suppresUnusedVariable(sp);
      std::string errorMsg;
      if(mb->isFailed()){
        errorMsg=mb->errorMsg();
280
281
282
283
      } else if (mb->isCanceled()) {
        errorMsg="Received a block marked as cancelled";
      } else{
        errorMsg="Mismatch between expected and received file id or blockid";
284
      }
285
286
      // Set the error flag for the session (in case of mismatch)
      m_errorFlag.set();
Victor Kotlyar's avatar
Victor Kotlyar committed
287
      lc.log(cta::log::ERR,errorMsg);
288
      throw cta::exception::Exception(errorMsg);
289
290
    }
  }
291
292
293
//------------------------------------------------------------------------------
// pushDataBlock
//------------------------------------------------------------------------------   
294
   void TapeWriteTask::pushDataBlock(MemBlock *mb) {
295
    cta::threading::MutexLocker ml(m_producerProtection);
296
297
298
    m_fifo.pushDataBlock(mb);
  }
  
299
300
301
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------   
302
   TapeWriteTask::~TapeWriteTask() {
303
    cta::threading::MutexLocker ml(m_producerProtection);
304
  }
305
306
//------------------------------------------------------------------------------
// openWriteFile
307
//------------------------------------------------------------------------------
308
   std::unique_ptr<tapeFile::WriteFile> TapeWriteTask::openWriteFile(
Victor Kotlyar's avatar
Victor Kotlyar committed
309
   tape::tapeFile::WriteSession & session, cta::log::LogContext&  lc){
310
     std::unique_ptr<tape::tapeFile::WriteFile> output;
311
     try{
David COME's avatar
David COME committed
312
       const uint64_t tapeBlockSize = 256*1024;
313
       output.reset(new tape::tapeFile::WriteFile(&session, *m_archiveJob,tapeBlockSize));
Victor Kotlyar's avatar
Victor Kotlyar committed
314
       lc.log(cta::log::DEBUG, "Successfully opened the tape file for writing");
315
     }
316
     catch(const cta::exception::Exception & ex){
Victor Kotlyar's avatar
Victor Kotlyar committed
317
318
       cta::log::LogContext::ScopedParam sp(lc, cta::log::Param("exceptionMessage", ex.getMessageValue()));
       lc.log(cta::log::ERR, "Failed to open tape file for writing");
319
320
321
322
       throw;
     }
     return output;
   }
323
324
325
//------------------------------------------------------------------------------
// circulateMemBlocks
//------------------------------------------------------------------------------   
326
327
328
   void TapeWriteTask::circulateMemBlocks(){
     while(!m_fifo.finished()) {
        m_memManager.releaseBlock(m_fifo.popDataBlock());
329
//        watchdog.notify();
330
331
     }
   }
332
333
334
335
336
337
//------------------------------------------------------------------------------
// hasAnotherTaskTailed
//------------------------------------------------------------------------------      
   void TapeWriteTask::hasAnotherTaskTailed() const {
    //if a task has signaled an error, we stop our job
    if(m_errorFlag){
338
      throw  castor::tape::tapeserver::daemon::ErrorFlag();
339
340
    }
  }
341
342
   
   void TapeWriteTask::logWithStats(int level, const std::string& msg,
Victor Kotlyar's avatar
Victor Kotlyar committed
343
344
   cta::log::LogContext&  lc) const{
     cta::log::ScopedParamContainer params(lc);
345
     params.add("readWriteTime", m_taskStats.readWriteTime)
346
347
348
           .add("checksumingTime",m_taskStats.checksumingTime)
           .add("waitDataTime",m_taskStats.waitDataTime)
           .add("waitReportingTime",m_taskStats.waitReportingTime)
349
           .add("transferTime",m_taskStats.transferTime())
350
           .add("totalTime", m_taskStats.totalTime)
351
352
           .add("dataVolume",m_taskStats.dataVolume)
           .add("headerVolume",m_taskStats.headerVolume)
353
           .add("driveTransferSpeedMBps",m_taskStats.totalTime?
354
355
                  1.0*(m_taskStats.dataVolume+m_taskStats.headerVolume)
                  /1000/1000/m_taskStats.totalTime:0.0)
356
357
           .add("payloadTransferSpeedMBps",m_taskStats.totalTime?
                   1.0*m_taskStats.dataVolume/1000/1000/m_taskStats.totalTime:0.0)
358
           .add("fileSize",m_archiveFile.fileSize)
359
           .add("fileId",m_archiveFile.archiveFileID)
360
361
           .add("fSeq",m_tapeFile.fSeq)
           .add("reconciliationTime",m_archiveFile.reconciliationTime)
362
           .add("LBPMode", m_LBPMode);
363
364
365
366
     
     lc.log(level, msg);

   }
367
368
369
370
371
372
//------------------------------------------------------------------------------
//   getTaskStats
//------------------------------------------------------------------------------
const TapeSessionStats TapeWriteTask::getTaskStats() const {
  return m_taskStats;
}
373
374
375
}}}}