TapeReadTask.hpp 12.1 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17
18
19

#pragma once

20

David COME's avatar
David COME committed
21
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
22
23
24
#include "castor/tape/tapeserver/daemon/RecallMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
25
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
26
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
27
#include "common/Timer.hpp"
28
#include "common/exception/Exception.hpp"
29

30
31
32
33
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
34
35
36
37
  /**
   * This class is in charge of 
   * 
   */
38
class TapeReadTask {
39
public:
40
  /**
Michael Davis's avatar
Michael Davis committed
41
   * Constructor
42
43
44
45
   * @param ftr The file being recalled. We acquire the ownership on the pointer
   * @param destination the task that will consume the memory blocks
   * @param mm The memory manager to get free block
   */
46
  TapeReadTask(cta::RetrieveJob *retrieveJob,
47
    DataConsumer & destination, RecallMemoryManager & mm): 
48
    m_retrieveJob(retrieveJob), m_fifo(destination), m_mm(mm) {}
49
    
50
51
52
53
54
55
56
    /**
     * @param rs the read session holding all we need to be able to read from the tape
     * @param lc the log context for .. logging purpose
     * The actual function that will do the job.
     * The main loop is :
     * Acquire a free memory block from the memory manager , fill it, push it 
     */
57
  void execute(castor::tape::tapeFile::ReadSession & rs,
Victor Kotlyar's avatar
Victor Kotlyar committed
58
    cta::log::LogContext & lc,RecallWatchDog& watchdog,
Victor Kotlyar's avatar
Victor Kotlyar committed
59
    TapeSessionStats & stats, cta::utils::Timer & timer) {
60

Victor Kotlyar's avatar
Victor Kotlyar committed
61
    using cta::log::Param;
Michael Davis's avatar
Michael Davis committed
62
63
64

    const bool isRepack = m_retrieveJob->m_dbJob->isRepack;
    const bool isVerifyOnly = m_retrieveJob->retrieveRequest.isVerifyOnly;
65
    // Set the common context for all the coming logs (file info)
Victor Kotlyar's avatar
Victor Kotlyar committed
66
    cta::log::ScopedParamContainer params(lc);
67
    params.add("fileId", m_retrieveJob->archiveFile.archiveFileID)
68
69
          .add("BlockId", m_retrieveJob->selectedTapeFile().blockId)
          .add("fSeq", m_retrieveJob->selectedTapeFile().fSeq)
70
          .add("dstURL", m_retrieveJob->retrieveRequest.dstURL)
Michael Davis's avatar
Michael Davis committed
71
72
          .add("isRepack", isRepack)
          .add("isVerifyOnly", isVerifyOnly);
73
    
74
75
    // We will clock the stats for the file itself, and eventually add those
    // stats to the session's.
76
    TapeSessionStats localStats;
77
    std::string LBPMode;
Victor Kotlyar's avatar
Victor Kotlyar committed
78
79
    cta::utils::Timer localTime;
    cta::utils::Timer totalTime(localTime);
80

81
82
    // Read the file and transmit it
    bool stillReading = true;
83
84
    //for counting how many mem blocks have used and how many tape blocks
    //(because one mem block can hold several tape blocks
85
86
    int fileBlock = 0;
    int tapeBlock = 0;
87
88
89
90
91
    // This out-of-try-catch variables allows us to record the stage of the 
    // process we're in, and to count the error if it occurs.
    // We will not record errors for an empty string. This will allow us to
    // prevent counting where error happened upstream.
    std::string currentErrorToCount = "";
92
93
    MemBlock* mb=NULL;
    try {
94
      currentErrorToCount = "Error_tapePositionForRead";
95
      std::unique_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
96
      LBPMode = rf->getLBPMode();
97
98
99
      // At that point we already read the header.
      localStats.headerVolume += TapeSessionStats::headerVolumePerFile;

Victor Kotlyar's avatar
Victor Kotlyar committed
100
      lc.log(cta::log::INFO, "Successfully positioned for reading");
Victor Kotlyar's avatar
Victor Kotlyar committed
101
      localStats.positionTime += timer.secs(cta::utils::Timer::resetCounter);
102
      watchdog.notifyBeginNewJob(m_retrieveJob->archiveFile.archiveFileID, m_retrieveJob->selectedTapeFile().fSeq);
Victor Kotlyar's avatar
Victor Kotlyar committed
103
      localStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
104
      currentErrorToCount = "Error_tapeReadData";
Michael Davis's avatar
Michael Davis committed
105
106
      auto checksum_adler32 = Payload::zeroAdler32();
      cta::checksum::ChecksumBlob tapeReadChecksum;
107
108
109
      while (stillReading) {
        // Get a memory block and add information to its metadata
        mb=m_mm.getFreeBlock();
Victor Kotlyar's avatar
Victor Kotlyar committed
110
        localStats.waitFreeMemoryTime += timer.secs(cta::utils::Timer::resetCounter);
111
        
112
        mb->m_fSeq = m_retrieveJob->selectedTapeFile().fSeq;
113
        mb->m_fileBlock = fileBlock++;
114
        mb->m_fileid = m_retrieveJob->retrieveRequest.archiveFileID;
115
116
117
118
119
120
121
        mb->m_tapeFileBlock = tapeBlock;
        mb->m_tapeBlockSize = rf->getBlockSize();
        try {
          // Fill up the memory block with tape block
          // append conveniently returns false when there will not be more space
          // for an extra tape block, and throws an exception if we reached the
          // end of file. append() also protects against reading too big tape blocks.
122
123
124
          while (mb->m_payload.append(*rf)) {
            tapeBlock++;
          }
125
        } catch (const cta::exception::EndOfFile&) {
126
127
128
          // append() signaled the end of the file.
          stillReading = false;
        }
Michael Davis's avatar
Michael Davis committed
129
        checksum_adler32 = mb->m_payload.adler32(checksum_adler32);
Victor Kotlyar's avatar
Victor Kotlyar committed
130
        localStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
131
132
        auto blockSize = mb->m_payload.size();
        localStats.dataVolume += blockSize;
133
134
	if(isRepack){
	  localStats.repackBytesCount += blockSize;
Michael Davis's avatar
Michael Davis committed
135
136
        } else if(isVerifyOnly) {
          localStats.verifiedBytesCount += blockSize;
Michael Davis's avatar
Michael Davis committed
137
138
          // Don't write the file to disk
          mb->markAsVerifyOnly();
139
140
141
	} else {
	  localStats.userBytesCount += blockSize;
	}
Michael Davis's avatar
Michael Davis committed
142
143
144
145
        // If we reached the end of the file, validate the checksum (throws an exception on bad checksum)
        if(!stillReading) {
          tapeReadChecksum.insert(cta::checksum::ADLER32, checksum_adler32);
          m_retrieveJob->archiveFile.checksumBlob.validate(tapeReadChecksum);
Michael Davis's avatar
Michael Davis committed
146
        }
147
148
        // Pass the block to the disk write task
        m_fifo.pushDataBlock(mb);
149
        mb=NULL;
150
        watchdog.notify(blockSize);
Victor Kotlyar's avatar
Victor Kotlyar committed
151
        localStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
152
      } //end of while(stillReading)
Michael Davis's avatar
Michael Davis committed
153
      // We have to signal the end of the tape read to the disk write task.
154
      m_fifo.pushDataBlock(NULL);
155
      // Log the successful transfer
156
      localStats.totalTime = localTime.secs();
157
158
159
160
      // Count the trailer size
      localStats.headerVolume += TapeSessionStats::trailerVolumePerFile;
      // We now transmitted one file:
      localStats.filesCount++;
161
162
      if(isRepack){
	localStats.repackFilesCount++;
Michael Davis's avatar
Michael Davis committed
163
164
      } else if(isVerifyOnly) {
        localStats.verifiedFilesCount++;
165
166
167
      } else {
	localStats.userFilesCount++;
      }
168
      params.add("positionTime", localStats.positionTime)
169
            .add("readWriteTime", localStats.readWriteTime)
170
171
            .add("waitFreeMemoryTime",localStats.waitFreeMemoryTime)
            .add("waitReportingTime",localStats.waitReportingTime)
172
            .add("transferTime",localStats.transferTime())
173
            .add("totalTime", localStats.totalTime)
174
            .add("dataVolume",localStats.dataVolume)
175
            .add("headerVolume",localStats.headerVolume)
176
            .add("driveTransferSpeedMBps",
177
178
                    localStats.totalTime?(1.0*localStats.dataVolume+1.0*localStats.headerVolume)
                     /1000/1000/localStats.totalTime:0)
179
            .add("payloadTransferSpeedMBps",
180
                     localStats.totalTime?1.0*localStats.dataVolume/1000/1000/localStats.totalTime:0)
181
182
183
184
            .add("LBPMode", LBPMode)
	    .add("repackFilesCount",localStats.repackFilesCount)
	    .add("repackBytesCount",localStats.repackBytesCount)
	    .add("userFilesCount",localStats.userFilesCount)
Michael Davis's avatar
Michael Davis committed
185
186
	    .add("userBytesCount",localStats.userBytesCount)
	    .add("verifiedFilesCount",localStats.verifiedFilesCount)
Michael Davis's avatar
Michael Davis committed
187
188
189
	    .add("verifiedBytesCount",localStats.verifiedBytesCount)
            .add("checksumType", "ADLER32")
            .add("checksumValue", cta::checksum::ChecksumBlob::ByteArrayToHex(tapeReadChecksum.at(cta::checksum::ADLER32)));
Victor Kotlyar's avatar
Victor Kotlyar committed
190
      lc.log(cta::log::INFO, "File successfully read from tape");
191
192
193
      // Add the local counts to the session's
      stats.add(localStats);
    } //end of try
194
    catch (const cta::exception::Exception & ex) {
Michael Davis's avatar
Michael Davis committed
195
196
      // We end up here because:
      //-- openReadFile brought us here (can't position to the file)
197
      //-- m_payload.append brought us here (error while reading the file)
Michael Davis's avatar
Michael Davis committed
198
      //-- checksum validation failed (after reading the last block from tape)
199
200
201
202
      // Record the error in the watchdog
      if (currentErrorToCount.size()) {
        watchdog.addToErrorCount(currentErrorToCount);
      }
203
204
      // This is an error case. Log and signal to the disk write task
      { 
Victor Kotlyar's avatar
Victor Kotlyar committed
205
206
        cta::log::LogContext::ScopedParam sp0(lc, Param("fileBlock", fileBlock));
        cta::log::LogContext::ScopedParam sp1(lc, Param("ErrorMessage", ex.getMessageValue()));
Michael Davis's avatar
Michael Davis committed
207
        lc.log(cta::log::ERR, "Error reading a file in TapeReadFileTask");
208
209
      }
      {
Victor Kotlyar's avatar
Victor Kotlyar committed
210
211
        cta::log::LogContext lc2(lc.logger());
        lc2.logBacktrace(cta::log::ERR, ex.backtrace());
212
213
      }
      
214
215
      // mb might or might not be allocated at this point, but 
      // reportErrorToDiskTask will deal with the allocation if required.
216
      reportErrorToDiskTask(ex.getMessageValue(),666,mb); // TODO - Remove error code
217
    } //end of catch
218
    watchdog.fileFinished();
219
  }
220
221
222
223
  /**
   * Get a valid block and ask to cancel the disk write task
   */
  void reportCancellationToDiskTask(){
224
    MemBlock* mb =m_mm.getFreeBlock();
225
226
    mb->m_fSeq = m_retrieveJob->selectedTapeFile().fSeq;
    mb->m_fileid = m_retrieveJob->retrieveRequest.archiveFileID;
227
228
229
    //mark the block cancelled and push it (plus signal the end)
     mb->markAsCancelled();
     m_fifo.pushDataBlock(mb);
230
  }
231
232
233
private:
  /**
   * Do the actual report to the disk write task
234
235
   * @param errorMsg The error message we will give to the client
   * @param mb The mem block we will use
236
   */
237
  void reportErrorToDiskTask(const std::string& msg,int code,MemBlock* mb = NULL){
238
239
240
241
    //If we are not provided with a block, allocate it and
    // fill it up
    if (!mb) {
      mb=m_mm.getFreeBlock();
242
243
      mb->m_fSeq = m_retrieveJob->selectedTapeFile().fSeq;
      mb->m_fileid = m_retrieveJob->retrieveRequest.archiveFileID;
244
245
    }
    //mark the block failed and push it (plus signal the end)
246
     mb->markAsFailed(msg,code);
247
248
249
     m_fifo.pushDataBlock(mb);
     m_fifo.pushDataBlock(NULL);
   }
250
251
  /** 
   * Open the file on the tape. In case of failure, log and throw
252
   * Copying the unique_ptr on the calling point will give us the ownership of the 
253
   * object.
254
   * @return if successful, return an unique_ptr on the ReadFile we want
255
   */
256
  std::unique_ptr<castor::tape::tapeFile::ReadFile> openReadFile(
Victor Kotlyar's avatar
Victor Kotlyar committed
257
  castor::tape::tapeFile::ReadSession & rs, cta::log::LogContext & lc){
258

Victor Kotlyar's avatar
Victor Kotlyar committed
259
260
    using cta::log::Param;
    typedef cta::log::LogContext::ScopedParam ScopedParam;
261

262
    std::unique_ptr<castor::tape::tapeFile::ReadFile> rf;
263
    try {
264
      rf.reset(new castor::tape::tapeFile::ReadFile(&rs, *m_retrieveJob));
Victor Kotlyar's avatar
Victor Kotlyar committed
265
      lc.log(cta::log::DEBUG, "Successfully opened the tape file");
266
    } catch (cta::exception::Exception & ex) {
267
268
      // Log the error
      ScopedParam sp0(lc, Param("ErrorMessage", ex.getMessageValue()));
Victor Kotlyar's avatar
Victor Kotlyar committed
269
      lc.log(cta::log::ERR, "Failed to open tape file for reading");
270
271
272
273
      throw;
    }
    return rf;
  }
274
275
276
277
  
  /**
   * All we need to know about the file we are recalling
   */
278
  cta::RetrieveJob *m_retrieveJob;
279
280
281
282
  
  /**
   * The task (seen as a Y) that will consume all the blocks we read
   */
283
  DataConsumer & m_fifo;
284
285
286
287
  
  /**
   *  The MemoryManager from whom we get free memory blocks 
   */
288
289
  RecallMemoryManager & m_mm;

290
};
291
292
293
294
}
}
}
}
295