DiskReadTask.cpp 9.63 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17

Victor Kotlyar's avatar
Victor Kotlyar committed
18
#include "common/log/LogContext.hpp"
19
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
20
#include "common/Timer.hpp"
21

22
23
24
25
26
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {

27
28
29
30
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
DiskReadTask::DiskReadTask(DataConsumer & destination, 
31
        cta::ArchiveJob *archiveJob,
32
        size_t numberOfBlock,cta::threading::AtomicFlag& errorFlag):
33
m_nextTask(destination),m_archiveJob(archiveJob),
34
        m_numberOfBlock(numberOfBlock),m_errorFlag(errorFlag)
35
{
36
  m_archiveJobCachedInfo.remotePath = m_archiveJob->srcURL;
37
  m_archiveJobCachedInfo.fileId = m_archiveJob->archiveFile.archiveFileID;
38
}
39
40
41
42

//------------------------------------------------------------------------------
// DiskReadTask::execute
//------------------------------------------------------------------------------
43
void DiskReadTask::execute(cta::log::LogContext&  lc, cta::disk::DiskFileFactory & fileFactory,
44
    MigrationWatchDog & watchdog, const int threadID) {
Victor Kotlyar's avatar
Victor Kotlyar committed
45
46
  using cta::log::LogContext;
  using cta::log::Param;
47

Victor Kotlyar's avatar
Victor Kotlyar committed
48
49
  cta::utils::Timer localTime;
  cta::utils::Timer totalTime(localTime);
50
  size_t blockId=0;
51
  size_t migratingFileSize=m_archiveJob->archiveFile.fileSize;
52
  MemBlock* mb=NULL;
53
54
55
56
57
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
  std::string currentErrorToCount = "";
58
59
60
61
  try{
    //we first check here to not even try to open the disk  if a previous task has failed
    //because the disk could the very reason why the previous one failed, 
    //so dont do the same mistake twice !
62
    checkMigrationFailing();
63
    currentErrorToCount = "Error_diskOpenForRead";
64
    std::unique_ptr<cta::disk::ReadFile> sourceFile(
65
      fileFactory.createReadFile(m_archiveJob->srcURL));
Victor Kotlyar's avatar
Victor Kotlyar committed
66
    cta::log::ScopedParamContainer URLcontext(lc);
67
    URLcontext.add("path", m_archiveJob->srcURL)
68
              .add("actualURL", sourceFile->URL());
69
    currentErrorToCount = "Error_diskFileToReadSizeMismatch";
70
    if(migratingFileSize != sourceFile->size()){
71
      throw cta::exception::Exception("Mismatch between size given by the client "
72
73
              "and the real one");
    }
74
    currentErrorToCount = "";
75
    
Victor Kotlyar's avatar
Victor Kotlyar committed
76
    m_stats.openingTime+=localTime.secs(cta::utils::Timer::resetCounter);
77
     
78
    LogContext::ScopedParam sp(lc, Param("fileId",m_archiveJob->archiveFile.archiveFileID));
Victor Kotlyar's avatar
Victor Kotlyar committed
79
    lc.log(cta::log::INFO,"Opened disk file for read");
80
81
82

    watchdog.addParameter(cta::log::Param("stillOpenFileForThread"+
      std::to_string((long long)threadID), sourceFile->URL()));
83
84
    
    while(migratingFileSize>0){
85

86
      checkMigrationFailing();
87
      
88
      mb = m_nextTask.getFreeBlock();
Victor Kotlyar's avatar
Victor Kotlyar committed
89
      m_stats.waitFreeMemoryTime+=localTime.secs(cta::utils::Timer::resetCounter);
90
      
91
      //set metadata and read the data
92
      mb->m_fileid = m_archiveJob->archiveFile.archiveFileID;
93
      mb->m_fileBlock = blockId++;
94
      
95
      currentErrorToCount = "Error_diskRead";
96
      migratingFileSize -= mb->m_payload.read(*sourceFile);
Victor Kotlyar's avatar
Victor Kotlyar committed
97
      m_stats.readWriteTime+=localTime.secs(cta::utils::Timer::resetCounter);
98
99
100

      m_stats.dataVolume += mb->m_payload.size();

101
102
103
      //we either read at full capacity (ie size=capacity, i.e. fill up the block),
      // or if there different, it should be the end of the file=> migratingFileSize 
      // should be 0. If it not, it is an error
104
      currentErrorToCount = "Error_diskUnexpectedSizeWhenReading";
105
      if(mb->m_payload.size() != mb->m_payload.totalCapacity() && migratingFileSize>0){
106
107
        std::string erroMsg = "Error while reading a file: memory block not filled up, but the file is not fully read yet";
        // Log the error
Victor Kotlyar's avatar
Victor Kotlyar committed
108
        cta::log::ScopedParamContainer params(lc);
109
110
111
        params.add("bytesInBlock", mb->m_payload.size())
              .add("BlockCapacity", mb->m_payload.totalCapacity())
              .add("BytesNotYetRead", migratingFileSize);
Victor Kotlyar's avatar
Victor Kotlyar committed
112
        lc.log(cta::log::ERR, "Error while reading a file: memory block not filled up, but the file is not fully read yet");
113
        // Mark the block as failed
114
        mb->markAsFailed(erroMsg,666);
115
116
117
        // Transmit to the tape write task, which will finish the session
        m_nextTask.pushDataBlock(mb);
        // Fail the disk side.
118
        throw cta::exception::Exception(erroMsg);
119
      }
120
      currentErrorToCount = "";
Victor Kotlyar's avatar
Victor Kotlyar committed
121
      m_stats.checkingErrorTime += localTime.secs(cta::utils::Timer::resetCounter);
122
      
123
124
125
126
      // We are done with the block, push it to the write task
      m_nextTask.pushDataBlock(mb);
      mb=NULL;
      
127
    } //end of while(migratingFileSize>0)
128
    m_stats.filesCount++;
129
    m_stats.totalTime = totalTime.secs();
130
131
132
    // We do not have delayed open like in disk writes, so time spent 
    // transferring equals total time.
    m_stats.transferTime = m_stats.totalTime;
Victor Kotlyar's avatar
Victor Kotlyar committed
133
    logWithStat(cta::log::INFO, "File successfully read from disk", lc);
134
  }
135
  catch(const castor::tape::tapeserver::daemon::ErrorFlag&){
136
   
Victor Kotlyar's avatar
Victor Kotlyar committed
137
    lc.log(cta::log::DEBUG,"DiskReadTask: a previous file has failed for migration "
138
    "Do nothing except circulating blocks");
139
    circulateAllBlocks(blockId,mb);
140
  }
141
  catch(const cta::exception::Exception& e){
142
143
144
145
    // Send the error for counting to the watchdog
    if (currentErrorToCount.size()) {
      watchdog.addToErrorCount(currentErrorToCount);
    }
146
147
148
149
150
151
152
153
154
155
156
157
    // We have to pump the blocks anyway, mark them failed and then pass them back 
    // to TapeWriteTask
    // Otherwise they would be stuck into TapeWriteTask free block fifo
    // If we got here we had some job to do so there shall be at least one
    // block either at hand or available.
    // The tape write task, upon reception of the failed block will mark the 
    // session as failed, hence signalling to the remaining disk read tasks to
    // cancel as nothing more will be written to tape.
    if (!mb) {
      mb=m_nextTask.getFreeBlock();
      ++blockId;
    }
158
    mb->markAsFailed(e.getMessageValue(), 666); // TODO - Drop error code
159
160
161
    m_nextTask.pushDataBlock(mb);
    mb=NULL;
    
162
163
164
165
166
    cta::log::ScopedParamContainer spc(lc);
    spc.add("blockID",blockId)
       .add("exceptionMessage", e.getMessageValue())
       .add("fileSize",m_archiveJob->archiveFile.fileSize);
    m_archiveJob->archiveFile.checksumBlob.addFirstChecksumToLog(spc);
Victor Kotlyar's avatar
Victor Kotlyar committed
167
    lc.log(cta::log::ERR,"Exception while reading a file");
168
169
    
    //deal here the number of mem block
170
    circulateAllBlocks(blockId,mb);
171
  } //end of catch
172
173
  watchdog.deleteParameter("stillOpenFileForThread"+
    std::to_string((long long)threadID));
174
}
175

176
177
178
//------------------------------------------------------------------------------
// DiskReadTask::circulateAllBlocks
//------------------------------------------------------------------------------
179
void DiskReadTask::circulateAllBlocks(size_t fromBlockId, MemBlock * mb){
180
181
  size_t blockId = fromBlockId;
  while(blockId<m_numberOfBlock) {
182
183
184
185
    if (!mb) {
      mb = m_nextTask.getFreeBlock();
      ++blockId;
    }
186
    mb->m_fileid = m_archiveJob->archiveFile.archiveFileID;
187
    mb->markAsCancelled();
188
    m_nextTask.pushDataBlock(mb);
189
    mb=NULL;
190
191
  } //end of while
}
192
193
194
195

//------------------------------------------------------------------------------
// logWithStat
//------------------------------------------------------------------------------  
Victor Kotlyar's avatar
Victor Kotlyar committed
196
197
void DiskReadTask::logWithStat(int level,const std::string& msg,cta::log::LogContext&  lc){
  cta::log::ScopedParamContainer params(lc);
198
199
     params.add("readWriteTime", m_stats.readWriteTime)
           .add("checksumingTime",m_stats.checksumingTime)
200
           .add("waitFreeMemoryTime",m_stats.waitFreeMemoryTime)
201
202
203
204
205
206
           .add("waitDataTime",m_stats.waitDataTime)
           .add("waitReportingTime",m_stats.waitReportingTime)
           .add("checkingErrorTime",m_stats.checkingErrorTime)
           .add("openingTime",m_stats.openingTime)
           .add("transferTime", m_stats.transferTime)
           .add("totalTime", m_stats.totalTime)
207
           .add("dataVolume", m_stats.dataVolume)
208
           .add("globalPayloadTransferSpeedMBps",
209
              m_stats.totalTime?1.0*m_stats.dataVolume/1000/1000/m_stats.totalTime:0)
210
           .add("diskPerformanceMBps",
211
              m_stats.transferTime?1.0*m_stats.dataVolume/1000/1000/m_stats.transferTime:0)
212
           .add("openRWCloseToTransferTimeRatio", 
213
              m_stats.transferTime?(m_stats.openingTime+m_stats.readWriteTime+m_stats.closingTime)/m_stats.transferTime:0.0)
214
           .add("fileId",m_archiveJobCachedInfo.fileId)
215
           .add("path",m_archiveJobCachedInfo.remotePath);
216
217
218
219
220
221
222
    lc.log(level,msg);
}

const DiskStats DiskReadTask::getTaskStats() const{
  return m_stats;
}

223
224
}}}}