TapeWriteTask.cpp 15.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

24
25
#include "castor/exception/Exception.hpp"
#include "castor/exception/Errnum.hpp"
26
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
David COME's avatar
David COME committed
27
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
28
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
29
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
30
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
31
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
32
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
33
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
34
#include "castor/tape/tapeserver/daemon/ErrorFlag.hpp"
35
36
#include "castor/tape/tapeserver/file/File.hpp" 
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
37
#include "serrno.h"
38

39
40
41
42
43
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {

44
45
46
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
47
  TapeWriteTask::TapeWriteTask(int blockCount, cta::ArchiveJob *archiveJob,
48
          MigrationMemoryManager& mm,castor::server::AtomicFlag& errorFlag): 
49
  m_archiveJob(archiveJob),m_memManager(mm), m_fifo(blockCount),
50
          m_blockCount(blockCount),m_errorFlag(errorFlag)
51
  {
52
    //register its fifo to the memory manager as a client in order to get mem block
53
54
    mm.addClient(&m_fifo); 
  }
55
56
57
//------------------------------------------------------------------------------
// fileSize
//------------------------------------------------------------------------------
58
  uint64_t TapeWriteTask::fileSize() { 
59
    return m_archiveJob->archiveFile.size; 
60
  }
61
62
63
//------------------------------------------------------------------------------
// execute
//------------------------------------------------------------------------------  
64
   void TapeWriteTask::execute(castor::tape::tapeFile::WriteSession & session,
65
           MigrationReportPacker & reportPacker, MigrationWatchDog & watchdog,
66
           castor::log::LogContext& lc, castor::utils::Timer & timer) {
67
68
    using castor::log::LogContext;
    using castor::log::Param;
69
70
71
    using castor::log::ScopedParamContainer;
    // Add to our logs the informations on the file
    ScopedParamContainer params(lc);
72
73
74
    params.add("NSHOSTNAME", m_archiveJob->archiveFile.nsHostName)
          .add("NSFILEID",m_archiveJob->archiveFile.fileId)
          .add("lastKnownFilename",m_archiveJob->archiveFile.path)
75
          .add("fileSize",m_archiveJob->archiveFile.size)
76
          .add("fSeq",m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq)
77
          .add("path",m_archiveJob->remotePathAndStatus.path.getRaw());
78
    
79
80
    // We will clock the stats for the file itself, and eventually add those
    // stats to the session's.
81
    
82
    castor::utils::Timer localTime;
83
    
David COME's avatar
David COME committed
84
    unsigned long ckSum = Payload::zeroAdler32();
85
    
86
    uint32_t memBlockId  = 0;
87
    
88
89
90
91
    // This out-of-try-catch variables allows us to record the stage of the 
    // process we're in, and to count the error if it occurs.
    // We will not record errors for an empty string. This will allow us to
    // prevent counting where error happened upstream.
92
    std::string currentErrorToCount = "Error_tapeFSeqOutOfSequenceForWrite";
93
    session.validateNextFSeq(m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq);
David COME's avatar
David COME committed
94
    try {
95
      //try to open the session
96
      currentErrorToCount = "Error_tapeWriteHeader";
97
      watchdog.notifyBeginNewJob(m_archiveJob->archiveFile.path, m_archiveJob->archiveFile.fileId, m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq);
98
      std::unique_ptr<castor::tape::tapeFile::WriteFile> output(openWriteFile(session,lc));
99
      m_taskStats.readWriteTime += timer.secs(castor::utils::Timer::resetCounter);
100
      m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile;
101
102
      // We are not error sources here until we actually write.
      currentErrorToCount = "";
David COME's avatar
David COME committed
103
      while(!m_fifo.finished()) {
Eric Cano's avatar
WIP    
Eric Cano committed
104
        MemBlock* const mb = m_fifo.popDataBlock();
105
        m_taskStats.waitDataTime += timer.secs(castor::utils::Timer::resetCounter);
106
        AutoReleaseBlock<MigrationMemoryManager> releaser(mb,m_memManager);
107
        
108
        //will throw (thus exiting the loop) if something is wrong
109
        checkErrors(mb,memBlockId,lc);
110
        
111
        ckSum =  mb->m_payload.adler32(ckSum);
112
        m_taskStats.checksumingTime += timer.secs(castor::utils::Timer::resetCounter);
113
        currentErrorToCount = "Error_tapeWriteData";
114
        mb->m_payload.write(*output);
115
        currentErrorToCount = "";
116
        
117
        m_taskStats.readWriteTime += timer.secs(castor::utils::Timer::resetCounter);
118
        m_taskStats.dataVolume += mb->m_payload.size();
119
        watchdog.notify();
120
        ++memBlockId;
121
      }
122
123
124
      
      //finish the writing of the file on tape
      //put the trailer
125
      currentErrorToCount = "Error_tapeWriteTrailer";
126
      output->close();
127
      currentErrorToCount = "";
128
      m_taskStats.readWriteTime += timer.secs(castor::utils::Timer::resetCounter);
129
      m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile;
130
      m_taskStats.filesCount ++;
131
      // Record the fSeq in the tape session
132
133
134
135
136
137
      session.reportWrittenFSeq(m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq);
      m_archiveJob->nameServerTapeFile.checksum = 
          cta::Checksum(cta::Checksum::CHECKSUMTYPE_ADLER32, cta::ByteArray(ckSum));
      m_archiveJob->nameServerTapeFile.compressedSize = m_taskStats.dataVolume;
      m_archiveJob->nameServerTapeFile.tapeFileLocation.blockId = output->getBlockId();
      reportPacker.reportCompletedJob(std::move(m_archiveJob));
138
      m_taskStats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
139
      m_taskStats.totalTime = localTime.secs();
140
      // Log the successful transfer      
141
      logWithStats(LOG_INFO, "File successfully transmitted to drive",lc);
142
    } 
143
    catch(const castor::tape::tapeserver::daemon::ErrorFlag&){
144
145
      // We end up there because another task has failed 
      // so we just log, circulate blocks and don't even send a report 
146
      lc.log(LOG_DEBUG,"TapeWriteTask: a previous file has failed for migration "
147
148
      "Do nothing except circulating blocks");
      circulateMemBlocks();
149
      
150
151
      // We throw again because we want TWST to stop all tasks from execution 
      // and go into a degraded mode operation.
152
      throw;
153
    }
154
    catch(const castor::exception::Exception& e){
155
156
157
      //we can end up there because
      //we failed to open the WriteFile
      //we received a bad block or a block written failed
158
159
      //close failed
      
160
      //first set the error flag: we can't proceed any further with writes.
161
      m_errorFlag.set();
162
      
163
164
165
166
167
168
169
170
171
172
173
      // If we reached the end of tape, this is not an error (ENOSPC)
      try {
        // If it's not the error we're looking for, we will go about our business
        // in the catch section. dynamic cast will throw, and we'll do ourselves
        // if the error code is not the one we want.
        const castor::exception::Errnum & en = 
          dynamic_cast<const castor::exception::Errnum &>(e);
        if(en.errorNumber()!= ENOSPC) {
          throw 0;
        }
        // This is indeed the end of the tape. Not an error.
174
        watchdog.setErrorCount("Info_tapeFilledUp",1);
175
176
177
178
179
180
181
      } catch (...) {
        // The error is not an ENOSPC, so it is, indeed, an error.
        // If we got here with a new error, currentErrorToCount will be non-empty,
        // and we will pass the error name to the watchdog.
        if(currentErrorToCount.size()) {
          watchdog.addToErrorCount(currentErrorToCount);
        }
182
      }
183
184

      //log and circulate blocks
185
      // We want to report internal error most of the time to avoid wrong
186
187
188
      // interpretation down the chain. Nevertheless, if the exception
      // if of type Errnum AND the errorCode is ENOSPC, we will propagate it.
      // This is how we communicate the fact that a tape is full to the client.
189
      // We also change the log level to INFO for the case of end of tape.
190
      int errorCode = e.code();
191
      int errorLevel = LOG_ERR;
192
193
194
      try {
        const castor::exception::Errnum & errnum = 
            dynamic_cast<const castor::exception::Errnum &> (e);
195
        if (ENOSPC == errnum.errorNumber()) {
196
          errorCode = ENOSPC;
197
198
          errorLevel = LOG_INFO;
        }
199
200
      } catch (...) {}
      LogContext::ScopedParam sp(lc, Param("exceptionCode",errorCode));
201
      LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
202
      lc.log(errorLevel,"An error occurred for this file. End of migrations.");
203
      circulateMemBlocks();
204
      reportPacker.reportFailedJob(std::move(m_archiveJob),e);
205
206
207
  
      //we throw again because we want TWST to stop all tasks from execution 
      //and go into a degraded mode operation.
208
      throw;
209
210
    }
    watchdog.fileFinished();
David COME's avatar
David COME committed
211
   }
212
213
214
//------------------------------------------------------------------------------
// getFreeBlock
//------------------------------------------------------------------------------    
215
216
217
  MemBlock * TapeWriteTask::getFreeBlock() { 
    return m_fifo.getFreeBlock(); 
  }
218
219
220
//------------------------------------------------------------------------------
// checkErrors
//------------------------------------------------------------------------------  
221
  void TapeWriteTask::checkErrors(MemBlock* mb,int memBlockId,castor::log::LogContext& lc){
222
    using namespace castor::log;
223
    if(m_archiveJob->archiveFile.fileId != static_cast<unsigned int>(mb->m_fileid)
224
225
226
            || memBlockId != mb->m_fileBlock
            || mb->isFailed()
            || mb->isCanceled()) {
227
228
      LogContext::ScopedParam sp[]={
        LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
229
        LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", memBlockId)),
230
231
232
233
234
        LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
        LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed()))
      };
      tape::utils::suppresUnusedVariable(sp);
      std::string errorMsg;
235
      int errorCode;
236
237
      if(mb->isFailed()){
        errorMsg=mb->errorMsg();
238
239
240
241
242
243
244
        errorCode=mb->errorCode();
      } else if (mb->isCanceled()) {
        errorMsg="Received a block marked as cancelled";
        errorCode=SEINTERNAL;
      } else{
        errorMsg="Mismatch between expected and received file id or blockid";
        errorCode=SEINTERNAL;
245
      }
246
247
      // Set the error flag for the session (in case of mismatch)
      m_errorFlag.set();
248
      lc.log(LOG_ERR,errorMsg);
249
      throw castor::exception::Exception(errorCode,errorMsg);
250
251
    }
  }
252
253
254
//------------------------------------------------------------------------------
// pushDataBlock
//------------------------------------------------------------------------------   
255
   void TapeWriteTask::pushDataBlock(MemBlock *mb) {
256
    castor::server::MutexLocker ml(&m_producerProtection);
257
258
259
    m_fifo.pushDataBlock(mb);
  }
  
260
261
262
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------   
263
   TapeWriteTask::~TapeWriteTask() {
264
    castor::server::MutexLocker ml(&m_producerProtection);
265
  }
266
267
//------------------------------------------------------------------------------
// openWriteFile
268
//------------------------------------------------------------------------------
269
   std::unique_ptr<tapeFile::WriteFile> TapeWriteTask::openWriteFile(
270
   tape::tapeFile::WriteSession & session, log::LogContext& lc){
271
     std::unique_ptr<tape::tapeFile::WriteFile> output;
272
     try{
David COME's avatar
David COME committed
273
       const uint64_t tapeBlockSize = 256*1024;
274
       output.reset(new tape::tapeFile::WriteFile(&session, *m_archiveJob,tapeBlockSize));
275
276
277
       lc.log(LOG_DEBUG, "Successfully opened the tape file for writing");
     }
     catch(const castor::exception::Exception & ex){
David COME's avatar
David COME committed
278
279
       log::LogContext::ScopedParam sp(lc, log::Param("exceptionCode",ex.code()));
       log::LogContext::ScopedParam sp1(lc, log::Param("exceptionMessage", ex.getMessageValue()));
280
281
282
283
284
       lc.log(LOG_ERR, "Failed to open tape file for writing");
       throw;
     }
     return output;
   }
285
286
287
//------------------------------------------------------------------------------
// circulateMemBlocks
//------------------------------------------------------------------------------   
288
289
290
   void TapeWriteTask::circulateMemBlocks(){
     while(!m_fifo.finished()) {
        m_memManager.releaseBlock(m_fifo.popDataBlock());
291
//        watchdog.notify();
292
293
     }
   }
294
295
296
297
298
299
//------------------------------------------------------------------------------
// hasAnotherTaskTailed
//------------------------------------------------------------------------------      
   void TapeWriteTask::hasAnotherTaskTailed() const {
    //if a task has signaled an error, we stop our job
    if(m_errorFlag){
300
      throw  castor::tape::tapeserver::daemon::ErrorFlag();
301
302
    }
  }
303
304
   
   void TapeWriteTask::logWithStats(int level, const std::string& msg,
305
   log::LogContext& lc) const{
306
     log::ScopedParamContainer params(lc);
307
     params.add("readWriteTime", m_taskStats.readWriteTime)
308
309
310
           .add("checksumingTime",m_taskStats.checksumingTime)
           .add("waitDataTime",m_taskStats.waitDataTime)
           .add("waitReportingTime",m_taskStats.waitReportingTime)
311
           .add("transferTime",m_taskStats.transferTime())
312
           .add("totalTime", m_taskStats.totalTime)
313
314
           .add("dataVolume",m_taskStats.dataVolume)
           .add("headerVolume",m_taskStats.headerVolume)
315
316
317
318
319
           .add("driveTransferSpeedMBps",m_taskStats.totalTime?
             (m_taskStats.dataVolume+m_taskStats.headerVolume)
                /1000/1000/m_taskStats.totalTime:0.0)
           .add("payloadTransferSpeedMBps",m_taskStats.totalTime?
                   1.0*m_taskStats.dataVolume/1000/1000/m_taskStats.totalTime:0.0)
320
           .add("fileSize",m_archiveJob->archiveFile.size)
321
322
           .add("NSHOST",m_archiveJob->archiveFile.nsHostName)
           .add("NSFILEID",m_archiveJob->archiveFile.fileId)
323
           .add("fSeq",m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq)
324
           .add("lastKnownFilename",m_archiveJob->archiveFile.path)
325
           .add("lastModificationTime",m_archiveJob->archiveFile.lastModificationTime);
326
327
328
329
     
     lc.log(level, msg);

   }
330
331
332
333
334
335
//------------------------------------------------------------------------------
//   getTaskStats
//------------------------------------------------------------------------------
const TapeSessionStats TapeWriteTask::getTaskStats() const {
  return m_taskStats;
}
336
337
338
}}}}