TapeWriteTask.cpp 15 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

24
#include "common/exception/Errnum.hpp"
25
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
David COME's avatar
David COME committed
26
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
27
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
28
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
29
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
30
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
31
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
32
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
33
#include "castor/tape/tapeserver/daemon/ErrorFlag.hpp"
34
35
#include "castor/tape/tapeserver/file/File.hpp" 
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
36
#include "common/exception/Exception.hpp"
37

38
39
40
41
42
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {

43
44
45
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
46
  TapeWriteTask::TapeWriteTask(int blockCount, cta::ArchiveJob *archiveJob,
47
          MigrationMemoryManager& mm,cta::threading::AtomicFlag& errorFlag): 
48
49
    m_archiveJob(archiveJob),m_memManager(mm), m_fifo(blockCount),
    m_blockCount(blockCount),m_errorFlag(errorFlag), 
50
    m_archiveFile(m_archiveJob->archiveFile), m_tapeFile(m_archiveJob->tapeFile),
51
    m_srcURL(m_archiveJob->srcURL)
52
  {
53
    //register its fifo to the memory manager as a client in order to get mem block
54
55
    mm.addClient(&m_fifo); 
  }
56
57
58
//------------------------------------------------------------------------------
// fileSize
//------------------------------------------------------------------------------
59
  uint64_t TapeWriteTask::fileSize() { 
60
    return m_archiveFile.fileSize; 
61
  }
62
63
64
//------------------------------------------------------------------------------
// execute
//------------------------------------------------------------------------------  
65
   void TapeWriteTask::execute(castor::tape::tapeFile::WriteSession & session,
66
           MigrationReportPacker & reportPacker, MigrationWatchDog & watchdog,
Victor Kotlyar's avatar
Victor Kotlyar committed
67
           castor::log::LogContext& lc, cta::utils::Timer & timer) {
68
69
    using castor::log::LogContext;
    using castor::log::Param;
70
71
72
    using castor::log::ScopedParamContainer;
    // Add to our logs the informations on the file
    ScopedParamContainer params(lc);
73
74
75
    params.add("fileId",m_archiveJob->archiveFile.archiveFileID)
          .add("fileSize",m_archiveJob->archiveFile.fileSize)
          .add("fSeq",m_archiveJob->tapeFile.fSeq)
76
          .add("diskURL",m_archiveJob->srcURL);
77
    
78
79
    // We will clock the stats for the file itself, and eventually add those
    // stats to the session's.
Victor Kotlyar's avatar
Victor Kotlyar committed
80
    cta::utils::Timer localTime;
David COME's avatar
David COME committed
81
    unsigned long ckSum = Payload::zeroAdler32();
82
    uint32_t memBlockId  = 0;
83
    
84
85
86
87
    // This out-of-try-catch variables allows us to record the stage of the 
    // process we're in, and to count the error if it occurs.
    // We will not record errors for an empty string. This will allow us to
    // prevent counting where error happened upstream.
88
    std::string currentErrorToCount = "Error_tapeFSeqOutOfSequenceForWrite";
89
    session.validateNextFSeq(m_archiveJob->tapeFile.fSeq);
David COME's avatar
David COME committed
90
    try {
91
      //try to open the session
92
      currentErrorToCount = "Error_tapeWriteHeader";
93
      watchdog.notifyBeginNewJob(m_archiveJob->archiveFile.archiveFileID, m_archiveJob->tapeFile.fSeq);
94
      std::unique_ptr<castor::tape::tapeFile::WriteFile> output(openWriteFile(session,lc));
95
      m_LBPMode = output->getLBPMode();
Victor Kotlyar's avatar
Victor Kotlyar committed
96
      m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
97
      m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile;
98
99
      // We are not error sources here until we actually write.
      currentErrorToCount = "";
David COME's avatar
David COME committed
100
      while(!m_fifo.finished()) {
Eric Cano's avatar
WIP    
Eric Cano committed
101
        MemBlock* const mb = m_fifo.popDataBlock();
Victor Kotlyar's avatar
Victor Kotlyar committed
102
        m_taskStats.waitDataTime += timer.secs(cta::utils::Timer::resetCounter);
103
        AutoReleaseBlock<MigrationMemoryManager> releaser(mb,m_memManager);
104
        
105
        //will throw (thus exiting the loop) if something is wrong
106
        checkErrors(mb,memBlockId,lc);
107
        
108
        ckSum =  mb->m_payload.adler32(ckSum);
Victor Kotlyar's avatar
Victor Kotlyar committed
109
        m_taskStats.checksumingTime += timer.secs(cta::utils::Timer::resetCounter);
110
        currentErrorToCount = "Error_tapeWriteData";
111
        mb->m_payload.write(*output);
112
        currentErrorToCount = "";
113
        
Victor Kotlyar's avatar
Victor Kotlyar committed
114
        m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
115
        m_taskStats.dataVolume += mb->m_payload.size();
116
        watchdog.notify();
117
        ++memBlockId;
118
      }
119
120
121
      
      //finish the writing of the file on tape
      //put the trailer
122
      currentErrorToCount = "Error_tapeWriteTrailer";
123
      output->close();
124
      currentErrorToCount = "";
Victor Kotlyar's avatar
Victor Kotlyar committed
125
      m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
126
      m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile;
127
      m_taskStats.filesCount ++;
128
      // Record the fSeq in the tape session
129
      session.reportWrittenFSeq(m_archiveJob->tapeFile.fSeq);
130
      m_archiveJob->tapeFile.checksumType = "ADLER32";
131
132
      { 
        std::stringstream cs;
133
        cs << std::hex << std::showbase << std::uppercase << std::setfill('0') << std::setw(8) << (uint32_t)ckSum;
134
135
136
137
        m_archiveJob->tapeFile.checksumValue = cs.str();
      }
      m_archiveJob->tapeFile.compressedSize = m_taskStats.dataVolume;
      m_archiveJob->tapeFile.blockId = output->getBlockId();
138
      reportPacker.reportCompletedJob(std::move(m_archiveJob));
Victor Kotlyar's avatar
Victor Kotlyar committed
139
      m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
140
      m_taskStats.totalTime = localTime.secs();
141
      // Log the successful transfer      
142
      logWithStats(LOG_INFO, "File successfully transmitted to drive",lc);
143
    } 
144
    catch(const castor::tape::tapeserver::daemon::ErrorFlag&){
145
146
      // We end up there because another task has failed 
      // so we just log, circulate blocks and don't even send a report 
147
      lc.log(LOG_DEBUG,"TapeWriteTask: a previous file has failed for migration "
148
149
      "Do nothing except circulating blocks");
      circulateMemBlocks();
150
      
151
152
      // We throw again because we want TWST to stop all tasks from execution 
      // and go into a degraded mode operation.
153
      throw;
154
    }
155
    catch(const cta::exception::Exception& e){
156
157
158
      //we can end up there because
      //we failed to open the WriteFile
      //we received a bad block or a block written failed
159
160
      //close failed
      
161
      //first set the error flag: we can't proceed any further with writes.
162
      m_errorFlag.set();
163
      
164
165
166
167
168
      // If we reached the end of tape, this is not an error (ENOSPC)
      try {
        // If it's not the error we're looking for, we will go about our business
        // in the catch section. dynamic cast will throw, and we'll do ourselves
        // if the error code is not the one we want.
169
170
        const cta::exception::Errnum & en = 
          dynamic_cast<const cta::exception::Errnum &>(e);
171
172
173
174
        if(en.errorNumber()!= ENOSPC) {
          throw 0;
        }
        // This is indeed the end of the tape. Not an error.
175
        watchdog.setErrorCount("Info_tapeFilledUp",1);
176
        reportPacker.reportTapeFull();
177
178
179
180
181
182
183
      } catch (...) {
        // The error is not an ENOSPC, so it is, indeed, an error.
        // If we got here with a new error, currentErrorToCount will be non-empty,
        // and we will pass the error name to the watchdog.
        if(currentErrorToCount.size()) {
          watchdog.addToErrorCount(currentErrorToCount);
        }
184
      }
185
186

      //log and circulate blocks
187
      // We want to report internal error most of the time to avoid wrong
188
189
190
      // interpretation down the chain. Nevertheless, if the exception
      // if of type Errnum AND the errorCode is ENOSPC, we will propagate it.
      // This is how we communicate the fact that a tape is full to the client.
191
      // We also change the log level to INFO for the case of end of tape.
192
      int errorCode = 666; // TODO - Remove error code
193
      int errorLevel = LOG_ERR;
194
      try {
195
196
        const cta::exception::Errnum & errnum = 
            dynamic_cast<const cta::exception::Errnum &> (e);
197
        if (ENOSPC == errnum.errorNumber()) {
198
          errorCode = ENOSPC;
199
200
          errorLevel = LOG_INFO;
        }
201
202
      } catch (...) {}
      LogContext::ScopedParam sp(lc, Param("exceptionCode",errorCode));
203
      LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
204
      lc.log(errorLevel,"An error occurred for this file. End of migrations.");
205
      circulateMemBlocks();
206
      reportPacker.reportFailedJob(std::move(m_archiveJob),e);
207
208
209
  
      //we throw again because we want TWST to stop all tasks from execution 
      //and go into a degraded mode operation.
210
      throw;
211
212
    }
    watchdog.fileFinished();
David COME's avatar
David COME committed
213
   }
214
215
216
//------------------------------------------------------------------------------
// getFreeBlock
//------------------------------------------------------------------------------    
217
218
219
  MemBlock * TapeWriteTask::getFreeBlock() { 
    return m_fifo.getFreeBlock(); 
  }
220
221
222
//------------------------------------------------------------------------------
// checkErrors
//------------------------------------------------------------------------------  
223
  void TapeWriteTask::checkErrors(MemBlock* mb,int memBlockId,castor::log::LogContext& lc){
224
    using namespace castor::log;
225
    if(m_archiveJob->archiveFile.archiveFileID != mb->m_fileid
226
227
228
            || memBlockId != mb->m_fileBlock
            || mb->isFailed()
            || mb->isCanceled()) {
229
      LogContext::ScopedParam sp[]={
230
231
232
        LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)),
        LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", memBlockId)),
        LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)),
233
234
235
236
237
238
        LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed()))
      };
      tape::utils::suppresUnusedVariable(sp);
      std::string errorMsg;
      if(mb->isFailed()){
        errorMsg=mb->errorMsg();
239
240
241
242
      } else if (mb->isCanceled()) {
        errorMsg="Received a block marked as cancelled";
      } else{
        errorMsg="Mismatch between expected and received file id or blockid";
243
      }
244
245
      // Set the error flag for the session (in case of mismatch)
      m_errorFlag.set();
246
      lc.log(LOG_ERR,errorMsg);
247
      throw cta::exception::Exception(errorMsg);
248
249
    }
  }
250
251
252
//------------------------------------------------------------------------------
// pushDataBlock
//------------------------------------------------------------------------------   
253
   void TapeWriteTask::pushDataBlock(MemBlock *mb) {
254
    cta::threading::MutexLocker ml(m_producerProtection);
255
256
257
    m_fifo.pushDataBlock(mb);
  }
  
258
259
260
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------   
261
   TapeWriteTask::~TapeWriteTask() {
262
    cta::threading::MutexLocker ml(m_producerProtection);
263
  }
264
265
//------------------------------------------------------------------------------
// openWriteFile
266
//------------------------------------------------------------------------------
267
   std::unique_ptr<tapeFile::WriteFile> TapeWriteTask::openWriteFile(
268
   tape::tapeFile::WriteSession & session, log::LogContext& lc){
269
     std::unique_ptr<tape::tapeFile::WriteFile> output;
270
     try{
David COME's avatar
David COME committed
271
       const uint64_t tapeBlockSize = 256*1024;
272
       output.reset(new tape::tapeFile::WriteFile(&session, *m_archiveJob,tapeBlockSize));
273
274
       lc.log(LOG_DEBUG, "Successfully opened the tape file for writing");
     }
275
276
     catch(const cta::exception::Exception & ex){
       log::LogContext::ScopedParam sp(lc, log::Param("exceptionMessage", ex.getMessageValue()));
277
278
279
280
281
       lc.log(LOG_ERR, "Failed to open tape file for writing");
       throw;
     }
     return output;
   }
282
283
284
//------------------------------------------------------------------------------
// circulateMemBlocks
//------------------------------------------------------------------------------   
285
286
287
   void TapeWriteTask::circulateMemBlocks(){
     while(!m_fifo.finished()) {
        m_memManager.releaseBlock(m_fifo.popDataBlock());
288
//        watchdog.notify();
289
290
     }
   }
291
292
293
294
295
296
//------------------------------------------------------------------------------
// hasAnotherTaskTailed
//------------------------------------------------------------------------------      
   void TapeWriteTask::hasAnotherTaskTailed() const {
    //if a task has signaled an error, we stop our job
    if(m_errorFlag){
297
      throw  castor::tape::tapeserver::daemon::ErrorFlag();
298
299
    }
  }
300
301
   
   void TapeWriteTask::logWithStats(int level, const std::string& msg,
302
   log::LogContext& lc) const{
303
     log::ScopedParamContainer params(lc);
304
     params.add("readWriteTime", m_taskStats.readWriteTime)
305
306
307
           .add("checksumingTime",m_taskStats.checksumingTime)
           .add("waitDataTime",m_taskStats.waitDataTime)
           .add("waitReportingTime",m_taskStats.waitReportingTime)
308
           .add("transferTime",m_taskStats.transferTime())
309
           .add("totalTime", m_taskStats.totalTime)
310
311
           .add("dataVolume",m_taskStats.dataVolume)
           .add("headerVolume",m_taskStats.headerVolume)
312
313
314
315
316
           .add("driveTransferSpeedMBps",m_taskStats.totalTime?
             (m_taskStats.dataVolume+m_taskStats.headerVolume)
                /1000/1000/m_taskStats.totalTime:0.0)
           .add("payloadTransferSpeedMBps",m_taskStats.totalTime?
                   1.0*m_taskStats.dataVolume/1000/1000/m_taskStats.totalTime:0.0)
317
           .add("fileSize",m_archiveFile.fileSize)
318
           .add("archiveFileID",m_archiveFile.archiveFileID)
319
320
           .add("fSeq",m_tapeFile.fSeq)
           .add("reconciliationTime",m_archiveFile.reconciliationTime)
321
           .add("LBPMode", m_LBPMode);
322
323
324
325
     
     lc.log(level, msg);

   }
326
327
328
329
330
331
//------------------------------------------------------------------------------
//   getTaskStats
//------------------------------------------------------------------------------
const TapeSessionStats TapeWriteTask::getTaskStats() const {
  return m_taskStats;
}
332
333
334
}}}}