DiskWriteTask.cpp 12 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

Victor Kotlyar's avatar
Victor Kotlyar committed
24
#include "common/log/LogContext.hpp"
25
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
26
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
27
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
28
#include "common/Timer.hpp"
29

30
31
32
33
34
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
  
Eric Cano's avatar
Eric Cano committed
35
36
37
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
38
DiskWriteTask::DiskWriteTask(cta::RetrieveJob *retrieveJob, RecallMemoryManager& mm): 
39
m_retrieveJob(retrieveJob),m_memManager(mm){}
40

Eric Cano's avatar
Eric Cano committed
41
42
43
//------------------------------------------------------------------------------
// DiskWriteTask::execute
//------------------------------------------------------------------------------
Victor Kotlyar's avatar
Victor Kotlyar committed
44
bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&  lc,
45
    cta::disk::DiskFileFactory & fileFactory, RecallWatchDog & watchdog,
46
    const int threadID) {
Victor Kotlyar's avatar
Victor Kotlyar committed
47
48
  using cta::log::LogContext;
  using cta::log::Param;
Victor Kotlyar's avatar
Victor Kotlyar committed
49
50
51
  cta::utils::Timer localTime;
  cta::utils::Timer totalTime(localTime);
  cta::utils::Timer transferTime(localTime);
Victor Kotlyar's avatar
Victor Kotlyar committed
52
  cta::log::ScopedParamContainer URLcontext(lc);
53
  URLcontext.add("fileId",m_retrieveJob->retrieveRequest.archiveFileID)
54
55
            .add("dstURL", m_retrieveJob->retrieveRequest.dstURL)
            .add("fSeq",m_retrieveJob->selectedTapeFile().fSeq);
56
57
  m_stats.dstURL = m_retrieveJob->retrieveRequest.dstURL;
  m_stats.fileId = m_retrieveJob->retrieveRequest.archiveFileID;
58
59
60
61
62
  // This out-of-try-catch variables allows us to record the stage of the 
  // process we're in, and to count the error if it occurs.
  // We will not record errors for an empty string. This will allow us to
  // prevent counting where error happened upstream.
  std::string currentErrorToCount = "";
Michael Davis's avatar
Michael Davis committed
63
  bool isVerifyOnly(false);
Eric Cano's avatar
Eric Cano committed
64
  try{
65
    currentErrorToCount = "";
66
67
    // Placeholder for the disk file. We will open it only
    // after getting a first correct memory block.
68
    std::unique_ptr<cta::disk::WriteFile> writeFile;
69
    
Eric Cano's avatar
Eric Cano committed
70
71
72
73
    int blockId  = 0;
    unsigned long checksum = Payload::zeroAdler32();
    while(1) {
      if(MemBlock* const mb = m_fifo.pop()) {
Victor Kotlyar's avatar
Victor Kotlyar committed
74
        m_stats.waitDataTime+=localTime.secs(cta::utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
75
        AutoReleaseBlock<RecallMemoryManager> releaser(mb,m_memManager);
Michael Davis's avatar
Michael Davis committed
76
77
78
79
80
        if(mb->isVerifyOnly()) {
          // For verifyOnly, there is no disk file to write. Ignore the memory block and continue.
          isVerifyOnly = true;
          continue;
        } else if(mb->isCanceled()) {
81
          // If the tape side got canceled, we report nothing and count
82
          // it as a success.
Victor Kotlyar's avatar
Victor Kotlyar committed
83
          lc.log(cta::log::DEBUG, "File transfer canceled");
84
85
          return true;
        }
86
87
        
        //will throw (thus exiting the loop) if something is wrong
88
        checkErrors(mb,blockId,lc);
Victor Kotlyar's avatar
Victor Kotlyar committed
89
        m_stats.checkingErrorTime += localTime.secs(cta::utils::Timer::resetCounter);
90
91
92
        // If we got that far on the first pass, it's now good enough to open
        // the disk file for writing...
        if (!writeFile.get()) {
Victor Kotlyar's avatar
Victor Kotlyar committed
93
          lc.log(cta::log::DEBUG, "About to open disk file for writing");
94
          // Synchronise the counter with the open time counter.
95
          currentErrorToCount = "Error_diskOpenForWrite";
96
          transferTime = localTime;
97
          writeFile.reset(fileFactory.createWriteFile(m_retrieveJob->retrieveRequest.dstURL));
98
          URLcontext.add("actualURL", writeFile->URL());
Victor Kotlyar's avatar
Victor Kotlyar committed
99
          lc.log(cta::log::INFO, "Opened disk file for writing");
Victor Kotlyar's avatar
Victor Kotlyar committed
100
          m_stats.openingTime+=localTime.secs(cta::utils::Timer::resetCounter);
101
102
          watchdog.addParameter(cta::log::Param("stillOpenFileForThread"+
            std::to_string((long long)threadID), writeFile->URL()));
103
        }
104
        
105
        // Write the data.
106
        currentErrorToCount = "Error_diskWrite";
107
        m_stats.dataVolume+=mb->m_payload.size();
108
109
        if (mb->m_payload.size())
          mb->m_payload.write(*writeFile);
Victor Kotlyar's avatar
Victor Kotlyar committed
110
        m_stats.readWriteTime+=localTime.secs(cta::utils::Timer::resetCounter);
111
        
Eric Cano's avatar
Eric Cano committed
112
        checksum = mb->m_payload.adler32(checksum);
Victor Kotlyar's avatar
Victor Kotlyar committed
113
        m_stats.checksumingTime+=localTime.secs(cta::utils::Timer::resetCounter);
114
        currentErrorToCount = "";
115
       
Eric Cano's avatar
Eric Cano committed
116
        blockId++;
Michael Davis's avatar
Michael Davis committed
117
118
119
120
121
        //end if block non NULL
      } else if(isVerifyOnly) {
        // No file to close, we are done
        break;
      } else {
122
123
124
        //close has to be explicit, because it may throw. 
        //A close is done  in WriteFile's destructor, but it may lead to some 
        //silent data loss
125
        currentErrorToCount = "Error_diskCloseAfterWrite";
126
127
128
        // Set the checksum on the server (actually needed only for Rados striper
        // noop in other cases).
        writeFile->setChecksum(checksum);
129
        writeFile->close();
Victor Kotlyar's avatar
Victor Kotlyar committed
130
        m_stats.closingTime +=localTime.secs(cta::utils::Timer::resetCounter);
131
        m_stats.filesCount++;
132
        break;
133
        currentErrorToCount = "";
134
      }
Eric Cano's avatar
Eric Cano committed
135
    } //end of while(1)
136
    m_retrieveJob->transferredSize = m_stats.dataVolume;
137
    m_retrieveJob->transferredChecksumType = "ADLER32";
138
139
140
141
142
    { 
      std::stringstream cs;
      cs << std::hex << std::nouppercase << std::setfill('0') << std::setw(8) << (uint32_t)checksum;
      m_retrieveJob->transferredChecksumValue = cs.str();
    }
143
    reporter.reportCompletedJob(std::move(m_retrieveJob));
Victor Kotlyar's avatar
Victor Kotlyar committed
144
    m_stats.waitReportingTime+=localTime.secs(cta::utils::Timer::resetCounter);
145
    m_stats.transferTime = transferTime.secs();
146
    m_stats.totalTime = totalTime.secs();
Michael Davis's avatar
Michael Davis committed
147
148
    logWithStat(cta::log::INFO, isVerifyOnly ? "File successfully verified" : "File successfully transfered to disk", lc);
    watchdog.deleteParameter("stillOpenFileForThread" + std::to_string((long long)threadID));
149
    //everything went well, return true
Eric Cano's avatar
Eric Cano committed
150
151
    return true;
  } //end of try
152
  catch(const cta::exception::Exception& e){
Eric Cano's avatar
Eric Cano committed
153
    /*
154
155
156
157
158
     *We might end up there because ;
     * -- WriteFile failed 
     * -- A desynchronization between tape read and disk write
     * -- An error in tape read
     * -- An error while writing the file
Eric Cano's avatar
Eric Cano committed
159
     */
160
161
162
    
    //there might still be some blocks into m_fifo
    // We need to empty it
Eric Cano's avatar
Eric Cano committed
163
164
    releaseAllBlock();
    
165
166
167
168
169
    // Propagate the error to the watchdog
    if (currentErrorToCount.size()) {
      watchdog.addToErrorCount(currentErrorToCount);
    }
    
Victor Kotlyar's avatar
Victor Kotlyar committed
170
    m_stats.waitReportingTime+=localTime.secs(cta::utils::Timer::resetCounter);
Victor Kotlyar's avatar
Victor Kotlyar committed
171
    cta::log::ScopedParamContainer params(lc);
172
    params.add("errorMessage", e.getMessageValue());
Michael Davis's avatar
Michael Davis committed
173
    logWithStat(cta::log::ERR, isVerifyOnly ? "File verification failed" : "File writing to disk failed", lc);
Victor Kotlyar's avatar
Victor Kotlyar committed
174
    lc.logBacktrace(cta::log::ERR, e.backtrace());
175
    reporter.reportFailedJob(std::move(m_retrieveJob), e);
176

177
178
179
    watchdog.deleteParameter("stillOpenFileForThread"+
      std::to_string((long long)threadID));

180
    //got an exception, return false
Eric Cano's avatar
Eric Cano committed
181
182
183
184
185
186
187
188
    return false;
  }
}

//------------------------------------------------------------------------------
// DiskWriteTask::getFreeBlock
//------------------------------------------------------------------------------
MemBlock *DiskWriteTask::getFreeBlock() { 
189
  throw cta::exception::Exception("DiskWriteTask::getFreeBlock should mot be called");
Eric Cano's avatar
Eric Cano committed
190
191
192
193
194
195
}

//------------------------------------------------------------------------------
// DiskWriteTask::pushDataBlock
//------------------------------------------------------------------------------
void DiskWriteTask::pushDataBlock(MemBlock *mb) {
196
  cta::threading::MutexLocker ml(m_producerProtection);
Eric Cano's avatar
Eric Cano committed
197
198
199
200
201
202
203
  m_fifo.push(mb);
}

//------------------------------------------------------------------------------
// DiskWriteTask::~DiskWriteTask
//------------------------------------------------------------------------------
DiskWriteTask::~DiskWriteTask() { 
204
  volatile cta::threading::MutexLocker ml(m_producerProtection); 
Eric Cano's avatar
Eric Cano committed
205
206
207
208
209
210
211
212
213
214
215
}

//------------------------------------------------------------------------------
// DiskWriteTask::releaseAllBlock
//------------------------------------------------------------------------------
void DiskWriteTask::releaseAllBlock(){
  while(1){
    if(MemBlock* mb=m_fifo.pop())
      AutoReleaseBlock<RecallMemoryManager> release(mb,m_memManager);
    else 
      break;
216
  }
Eric Cano's avatar
Eric Cano committed
217
}
218

219
220
221
//------------------------------------------------------------------------------
// checkErrors
//------------------------------------------------------------------------------  
Victor Kotlyar's avatar
Victor Kotlyar committed
222
223
  void DiskWriteTask::checkErrors(MemBlock* mb,int blockId,cta::log::LogContext&  lc){
    using namespace cta::log;
224
    if(m_retrieveJob->retrieveRequest.archiveFileID != mb->m_fileid
225
226
            || blockId != mb->m_fileBlock  || mb->isFailed() ){
      LogContext::ScopedParam sp[]={
227
228
229
        LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)),
        LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", blockId)),
        LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)),
230
231
232
233
        LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed()))
      };
      tape::utils::suppresUnusedVariable(sp);
      std::string errorMsg;
Steven Murray's avatar
Steven Murray committed
234
      //int errCode;
235
236
      if(mb->isFailed()){
        errorMsg=mb->errorMsg();
237
238
239
        
        //disabled temporarily (see comment in MemBlock)
        //errCode=mb->errorCode();
240
241
      }
      else{
Michael Davis's avatar
Michael Davis committed
242
        errorMsg="Mismatch between expected and received fileid or blockid";
Steven Murray's avatar
Steven Murray committed
243
        //errCode=666;
244
      }
Victor Kotlyar's avatar
Victor Kotlyar committed
245
      lc.log(cta::log::ERR,errorMsg);
246
      throw cta::exception::Exception(errorMsg);
247
248
249
    }
  }

250
251
252
253
254
255
256
257
258
//------------------------------------------------------------------------------
// getTiming
//------------------------------------------------------------------------------  
const DiskStats DiskWriteTask::getTaskStats() const{
  return m_stats;
}
//------------------------------------------------------------------------------
// logWithStat
//------------------------------------------------------------------------------  
Victor Kotlyar's avatar
Victor Kotlyar committed
259
260
void DiskWriteTask::logWithStat(int level,const std::string& msg,cta::log::LogContext&  lc){
  cta::log::ScopedParamContainer params(lc);
261
262
263
264
265
266
267
268
269
     params.add("readWriteTime", m_stats.readWriteTime)
           .add("checksumingTime",m_stats.checksumingTime)
           .add("waitDataTime",m_stats.waitDataTime)
           .add("waitReportingTime",m_stats.waitReportingTime)
           .add("checkingErrorTime",m_stats.checkingErrorTime)
           .add("openingTime",m_stats.openingTime)
           .add("closingTime",m_stats.closingTime)
           .add("transferTime", m_stats.transferTime)
           .add("totalTime", m_stats.totalTime)
270
           .add("dataVolume", m_stats.dataVolume)
271
           .add("globalPayloadTransferSpeedMBps",
272
              m_stats.totalTime?1.0*m_stats.dataVolume/1000/1000/m_stats.totalTime:0)
273
           .add("diskPerformanceMBps",
274
              m_stats.transferTime?1.0*m_stats.dataVolume/1000/1000/m_stats.transferTime:0)
275
           .add("openRWCloseToTransferTimeRatio", 
276
              m_stats.transferTime?(m_stats.openingTime+m_stats.readWriteTime+m_stats.closingTime)/m_stats.transferTime:0.0)
277
278
           .add("fileId",m_stats.fileId)
           .add("dstURL",m_stats.dstURL);
279
280
    lc.log(level,msg);
}
281
282
}}}}