TapeReadTask.hpp 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

26

David COME's avatar
David COME committed
27
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
28
29
#include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
30
#include "castor/exception/Exception.hpp"
31
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
32
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
33
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
34
#include "castor/utils/Timer.hpp"
35

36
37
38
39
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
40
41
42
43
  /**
   * This class is in charge of 
   * 
   */
44
class TapeReadTask {
45
public:
46
47
48
49
50
51
  /**
   * COnstructor
   * @param ftr The file being recalled. We acquire the ownership on the pointer
   * @param destination the task that will consume the memory blocks
   * @param mm The memory manager to get free block
   */
52
  TapeReadTask(cta::RetrieveJob *retrieveJob,
53
    DataConsumer & destination, RecallMemoryManager & mm): 
54
    m_retrieveJob(retrieveJob), m_fifo(destination), m_mm(mm) {}
55
    
56
57
58
59
60
61
62
    /**
     * @param rs the read session holding all we need to be able to read from the tape
     * @param lc the log context for .. logging purpose
     * The actual function that will do the job.
     * The main loop is :
     * Acquire a free memory block from the memory manager , fill it, push it 
     */
63
  void execute(castor::tape::tapeFile::ReadSession & rs,
64
    castor::log::LogContext & lc,RecallWatchDog& watchdog,
65
    TapeSessionStats & stats, castor::utils::Timer & timer) {
66

67
68
69
    using castor::log::Param;
    typedef castor::log::LogContext::ScopedParam ScopedParam;
    
70
    // Set the common context for all the coming logs (file info)
71
    log::ScopedParamContainer params(lc);
72
73
74
    params.add("NSFILEID", m_retrieveJob->archiveFile.archiveFileID)
          .add("BlockId", m_retrieveJob->tapeFile.blockId)
          .add("fSeq", m_retrieveJob->tapeFile.fSeq)
75
          .add("path", m_retrieveJob->remotePath);
76
    
77
78
    // We will clock the stats for the file itself, and eventually add those
    // stats to the session's.
79
    TapeSessionStats localStats;
80
    std::string LBPMode;
81
    castor::utils::Timer localTime;
82
    castor::utils::Timer totalTime(localTime);
83

84
85
    // Read the file and transmit it
    bool stillReading = true;
86
87
    //for counting how many mem blocks have used and how many tape blocks
    //(because one mem block can hold several tape blocks
88
89
    int fileBlock = 0;
    int tapeBlock = 0;
90
91
92
93
94
    // This out-of-try-catch variables allows us to record the stage of the 
    // process we're in, and to count the error if it occurs.
    // We will not record errors for an empty string. This will allow us to
    // prevent counting where error happened upstream.
    std::string currentErrorToCount = "";
95
96
    MemBlock* mb=NULL;
    try {
97
      currentErrorToCount = "Error_tapePositionForRead";
98
      std::unique_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
99
      LBPMode = rf->getLBPMode();
100
101
102
      // At that point we already read the header.
      localStats.headerVolume += TapeSessionStats::headerVolumePerFile;

103
      lc.log(LOG_INFO, "Successfully positioned for reading");
104
      localStats.positionTime += timer.secs(castor::utils::Timer::resetCounter);
105
      watchdog.notifyBeginNewJob(m_retrieveJob->archiveFile.archiveFileID, m_retrieveJob->tapeFile.fSeq);
106
      localStats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
107
      currentErrorToCount = "Error_tapeReadData";
108
109
110
      while (stillReading) {
        // Get a memory block and add information to its metadata
        mb=m_mm.getFreeBlock();
111
        localStats.waitFreeMemoryTime += timer.secs(castor::utils::Timer::resetCounter);
112
        
113
        mb->m_fSeq = m_retrieveJob->tapeFile.fSeq;
114
        mb->m_fileBlock = fileBlock++;
115
        mb->m_fileid = m_retrieveJob->archiveFile.archiveFileID;
116
117
118
119
120
121
122
        mb->m_tapeFileBlock = tapeBlock;
        mb->m_tapeBlockSize = rf->getBlockSize();
        try {
          // Fill up the memory block with tape block
          // append conveniently returns false when there will not be more space
          // for an extra tape block, and throws an exception if we reached the
          // end of file. append() also protects against reading too big tape blocks.
123
124
125
          while (mb->m_payload.append(*rf)) {
            tapeBlock++;
          }
126
        } catch (const castor::exception::EndOfFile&) {
127
128
129
          // append() signaled the end of the file.
          stillReading = false;
        }
130
        localStats.readWriteTime += timer.secs(castor::utils::Timer::resetCounter);
131
132
133
        localStats.dataVolume += mb->m_payload.size();
        // Pass the block to the disk write task
        m_fifo.pushDataBlock(mb);
134
        mb=NULL;
135
        watchdog.notify();
136
        localStats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
137
      } //end of while(stillReading)
138
139
      //  we have to signal the end of the tape read to the disk write task.
      m_fifo.pushDataBlock(NULL);
140
      // Log the successful transfer
141
      localStats.totalTime = localTime.secs();
142
143
144
145
      // Count the trailer size
      localStats.headerVolume += TapeSessionStats::trailerVolumePerFile;
      // We now transmitted one file:
      localStats.filesCount++;
146
      params.add("positionTime", localStats.positionTime)
147
            .add("readWriteTime", localStats.readWriteTime)
148
149
            .add("waitFreeMemoryTime",localStats.waitFreeMemoryTime)
            .add("waitReportingTime",localStats.waitReportingTime)
150
            .add("transferTime",localStats.transferTime())
151
            .add("totalTime", localStats.totalTime)
152
            .add("dataVolume",localStats.dataVolume)
153
            .add("headerVolume",localStats.headerVolume)
154
            .add("driveTransferSpeedMBps",
155
156
                    localStats.totalTime?(1.0*localStats.dataVolume+1.0*localStats.headerVolume)
                     /1000/1000/localStats.totalTime:0)
157
            .add("payloadTransferSpeedMBps",
158
159
                     localStats.totalTime?1.0*localStats.dataVolume/1000/1000/localStats.totalTime:0)
            .add("LBPMode", LBPMode);
Eric Cano's avatar
Eric Cano committed
160
      lc.log(LOG_INFO, "File successfully read from tape");
161
162
163
      // Add the local counts to the session's
      stats.add(localStats);
    } //end of try
164
    catch (const castor::exception::Exception & ex) {
165
      //we end up there because :
166
      //-- openReadFile brought us here (cant position to the file)
167
      //-- m_payload.append brought us here (error while reading the file)
168
169
170
171
      // Record the error in the watchdog
      if (currentErrorToCount.size()) {
        watchdog.addToErrorCount(currentErrorToCount);
      }
172
173
174
175
176
      // This is an error case. Log and signal to the disk write task
      { 
        castor::log::LogContext::ScopedParam sp0(lc, Param("fileBlock", fileBlock));
        castor::log::LogContext::ScopedParam sp1(lc, Param("ErrorMessage", ex.getMessageValue()));
        castor::log::LogContext::ScopedParam sp2(lc, Param("ErrorCode", ex.code()));
177
        lc.log(LOG_ERR, "Error reading a file in TapeReadFileTask (backtrace follows)");
178
179
180
181
182
183
      }
      {
        castor::log::LogContext lc2(lc.logger());
        lc2.logBacktrace(LOG_ERR, ex.backtrace());
      }
      
184
185
      // mb might or might not be allocated at this point, but 
      // reportErrorToDiskTask will deal with the allocation if required.
186
      reportErrorToDiskTask(ex.getMessageValue(),ex.code(),mb);
187
    } //end of catch
188
    watchdog.fileFinished();
189
  }
190
191
192
193
  /**
   * Get a valid block and ask to cancel the disk write task
   */
  void reportCancellationToDiskTask(){
194
    MemBlock* mb =m_mm.getFreeBlock();
195
196
    mb->m_fSeq = m_retrieveJob->tapeFile.fSeq;
    mb->m_fileid = m_retrieveJob->archiveFile.archiveFileID;
197
198
199
    //mark the block cancelled and push it (plus signal the end)
     mb->markAsCancelled();
     m_fifo.pushDataBlock(mb);
200
  }
201
202
203
private:
  /**
   * Do the actual report to the disk write task
204
205
   * @param errorMsg The error message we will give to the client
   * @param mb The mem block we will use
206
   */
207
  void reportErrorToDiskTask(const std::string& msg,int code,MemBlock* mb = NULL){
208
209
210
211
    //If we are not provided with a block, allocate it and
    // fill it up
    if (!mb) {
      mb=m_mm.getFreeBlock();
212
213
      mb->m_fSeq = m_retrieveJob->tapeFile.fSeq;
      mb->m_fileid = m_retrieveJob->archiveFile.archiveFileID;
214
215
    }
    //mark the block failed and push it (plus signal the end)
216
     mb->markAsFailed(msg,code);
217
218
219
     m_fifo.pushDataBlock(mb);
     m_fifo.pushDataBlock(NULL);
   }
220
221
  /** 
   * Open the file on the tape. In case of failure, log and throw
222
   * Copying the unique_ptr on the calling point will give us the ownership of the 
223
   * object.
224
   * @return if successful, return an unique_ptr on the ReadFile we want
225
   */
226
  std::unique_ptr<castor::tape::tapeFile::ReadFile> openReadFile(
227
228
229
230
231
  castor::tape::tapeFile::ReadSession & rs, castor::log::LogContext & lc){

    using castor::log::Param;
    typedef castor::log::LogContext::ScopedParam ScopedParam;

232
    std::unique_ptr<castor::tape::tapeFile::ReadFile> rf;
233
    try {
234
      rf.reset(new castor::tape::tapeFile::ReadFile(&rs, *m_retrieveJob));
235
236
237
238
239
240
241
242
243
244
      lc.log(LOG_DEBUG, "Successfully opened the tape file");
    } catch (castor::exception::Exception & ex) {
      // Log the error
      ScopedParam sp0(lc, Param("ErrorMessage", ex.getMessageValue()));
      ScopedParam sp1(lc, Param("ErrorCode", ex.code()));
      lc.log(LOG_ERR, "Failed to open tape file for reading");
      throw;
    }
    return rf;
  }
245
246
247
248
  
  /**
   * All we need to know about the file we are recalling
   */
249
  cta::RetrieveJob *m_retrieveJob;
250
251
252
253
  
  /**
   * The task (seen as a Y) that will consume all the blocks we read
   */
254
  DataConsumer & m_fifo;
255
256
257
258
  
  /**
   *  The MemoryManager from whom we get free memory blocks 
   */
259
260
  RecallMemoryManager & m_mm;

261
};
262
263
264
265
}
}
}
}
266