MigrationReportPacker.cpp 13.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
25
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
26
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
27
#include "serrno.h"
28
29

#include <memory>
30
#include <numeric>
31
#include <cstdio>
32

33
namespace{
34
  struct failedMigrationRecallResult : public castor::exception::Exception{
35
36
37
38
39
40
    failedMigrationRecallResult(const std::string& s): Exception(s){}
  };
}
using castor::log::LogContext;
using castor::log::Param;

41
42
43
44
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
45
46
47
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
48
MigrationReportPacker::MigrationReportPacker(cta::ArchiveMount *archiveMount,
49
  castor::log::LogContext lc):
50
ReportPackerInterface<detail::Migration>(lc),
51
m_workerThread(*this),m_errorHappened(false),m_continue(true), m_archiveMount(archiveMount) {
52
}
53
54
55
//------------------------------------------------------------------------------
//Destructore
//------------------------------------------------------------------------------
56
MigrationReportPacker::~MigrationReportPacker(){
57
  castor::server::MutexLocker ml(&m_producterProtection);
58
}
59
60
61
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------ 
David COME's avatar
David COME committed
62
void MigrationReportPacker::reportCompletedJob(
63
64
std::unique_ptr<cta::ArchiveJob> successfulArchiveJob) {
  std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulArchiveJob)));
65
  castor::server::MutexLocker ml(&m_producterProtection);
66
67
  m_fifo.push(rep.release());
}
68
69
70
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------ 
71
void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
72
73
        const castor::exception::Exception &ex){
  std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob),ex));
74
  castor::server::MutexLocker ml(&m_producterProtection);
75
76
  m_fifo.push(rep.release());
}
77
78
//------------------------------------------------------------------------------
//reportFlush
79
//------------------------------------------------------------------------------
80
void MigrationReportPacker::reportFlush(drive::compressionStats compressStats){
81
  castor::server::MutexLocker ml(&m_producterProtection);
82
  m_fifo.push(new ReportFlush(compressStats));
83
}
84
85
86
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------ 
87
void MigrationReportPacker::reportEndOfSession() {
88
    castor::server::MutexLocker ml(&m_producterProtection);
89
90
    m_fifo.push(new ReportEndofSession());
}
91
92
93
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------ 
94
void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int errorCode){
95
  castor::server::MutexLocker ml(&m_producterProtection);
96
  m_fifo.push(new ReportEndofSessionWithErrors(msg,errorCode));
97
}
98
99
100
101
102
103
104
105
106
//------------------------------------------------------------------------------
//synchronousReportEndWithErrors
//------------------------------------------------------------------------------ 
void MigrationReportPacker::synchronousReportEndWithErrors(const std::string msg, int errorCode){
  // We create the report task here and excute it immediately instead of posting
  // it to a queue.
  ReportEndofSessionWithErrors rep(msg,errorCode);
  rep.execute(*this);
}
107

108
//------------------------------------------------------------------------------
109
110
//ReportSuccessful::execute
//------------------------------------------------------------------------------
111
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& reportPacker){
112
  reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob));
113
}
114
//------------------------------------------------------------------------------
115
116
//ReportFlush::computeCompressedSize
//------------------------------------------------------------------------------
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//void MigrationReportPacker::ReportFlush::computeCompressedSize(
//std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator beg,
//std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator end
//)
//{
//  //lets pray for C++11 lamba and std accumulate
//  uint64_t rawSize = 0;
//  for(std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator  it = beg;
//          it != end ;++it){
//            rawSize+=(*it)->fileSize();
//  }
//
//  uint64_t nbByteWritenWithCompression = m_compressStats.toTape;  
//  //we dont want compressionRatio to be equal to zero not to have a division by zero
//  double compressionRatio = nbByteWritenWithCompression>0 && rawSize >0 ? 
//    1.0*nbByteWritenWithCompression/rawSize : 1.;
//  
//  for(std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator  it = beg;
//          it != end ;++it){
//    const uint64_t compressedFileSize =
//    static_cast<uint64_t>((*it)->fileSize() * compressionRatio);
//
//    // The compressed file size should never be reported as being less than 1
//    // byte
//    uint64_t validCompressedFileSize = 0 < compressedFileSize ? compressedFileSize : 1;
//    
//    (*it)->setCompressedFileSize(validCompressedFileSize);
//  }
//}
Daniele Kruse's avatar
Daniele Kruse committed
146

147
//------------------------------------------------------------------------------
148
149
//ReportFlush::execute
//------------------------------------------------------------------------------
150
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPacker){
151

152
  if(!reportPacker.m_errorHappened){
153
154
155
    // We can receive double flushes when the periodic flush happens
    // right before the end of session (which triggers also a flush)
    // We refrain from sending an empty report to the client in this case.
Daniele Kruse's avatar
Daniele Kruse committed
156
    if (reportPacker.m_successfulArchiveJobs.empty()) {
157
158
159
      reportPacker.m_lc.log(LOG_INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing.");
      return;
    }
160
    try{
Daniele Kruse's avatar
Daniele Kruse committed
161
162
163
      while(!reportPacker.m_successfulArchiveJobs.empty()) {
        std::unique_ptr<cta::ArchiveJob> job(std::move(reportPacker.m_successfulArchiveJobs.front()));
        job->complete();
164
        reportPacker.m_successfulArchiveJobs.pop();
Daniele Kruse's avatar
Daniele Kruse committed
165
166
      }
      reportPacker.m_lc.log(LOG_INFO,"Reported to the client that a batch of files was written on tape");
167
    }
168
    catch(const castor::exception::Exception& e){
169
      LogContext::ScopedParam sp[]={
170
171
172
        LogContext::ScopedParam(reportPacker.m_lc, Param("exceptionCode",e.code())),
        LogContext::ScopedParam(reportPacker.m_lc, Param("exceptionMessageValue", e.getMessageValue())),
        LogContext::ScopedParam(reportPacker.m_lc, Param("exceptionWhat",e.what()))
173
174
175
176
      };
      tape::utils::suppresUnusedVariable(sp);
      
      const std::string msg_error="An exception was caught trying to call reportMigrationResults";
177
      reportPacker.m_lc.log(LOG_ERR,msg_error);
178
      throw failedMigrationRecallResult(msg_error);
179
    }
180
181
  } else {
    // This is an abnormal situation: we should never flush after an error!
182
    reportPacker.m_lc.log(LOG_ALERT,"Received a flush after an error: sending file errors to client");
183
  }
184
}
185
186
187
188

//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
189
190
void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& reportPacker){
  if(!reportPacker.m_errorHappened){
191
    reportPacker.m_archiveMount->complete();
192
193
    log::ScopedParamContainer sp(reportPacker.m_lc);
    reportPacker.m_lc.log(LOG_INFO,"Reported end of session to client");
194
    if(reportPacker.m_watchdog) {
195
      reportPacker.m_watchdog->addParameter(log::Param("status","success"));
196
197
198
199
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
200
    }
201
202
  }
  else {
203
204
    // We have some errors
    reportPacker.m_archiveMount->complete();
205
    log::ScopedParamContainer sp(reportPacker.m_lc);
206
    sp.add("errorMessage", "Previous file errors")
Daniele Kruse's avatar
Daniele Kruse committed
207
      .add("errorCode", SEINTERNAL);
208
    reportPacker.m_lc.log(LOG_ERR,"Reported end of session with error to client due to previous file errors");
209
    if(reportPacker.m_watchdog) {
210
      reportPacker.m_watchdog->addParameter(log::Param("status","failure"));
211
212
213
214
      // We have a race condition here between the processing of this message by
      // the initial process and the printing of the end-of-session log, triggered
      // by the end our process. To delay the latter, we sleep half a second here.
      usleep(500*1000);
215
    }
216
  }
217
  reportPacker.m_continue=false;
218
}
219

220
221
222
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
223
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& reportPacker){
224
  
225
  if(reportPacker.m_errorHappened) {
226
    reportPacker.m_archiveMount->complete();
227
    log::ScopedParamContainer sp(reportPacker.m_lc);
228
    sp.add("errorMessage", m_message)
Daniele Kruse's avatar
Daniele Kruse committed
229
      .add("errorCode", m_errorCode);
230
    reportPacker.m_lc.log(LOG_INFO,"Reported end of session with error to client after sending file errors");
231
232
  } else{
    const std::string& msg ="Reported end of session with error to client";
233
234
235
236
237
    // As a measure of safety we censor any session error which is not ENOSPC into
    // SEINTERNAL. ENOSPC is the only one interpreted by the tape gateway.
    if (ENOSPC != m_errorCode) {
      m_errorCode = SEINTERNAL;
    }
238
    reportPacker.m_archiveMount->complete(); 
239
    reportPacker.m_lc.log(LOG_INFO,msg);
240
  }
241
  if(reportPacker.m_watchdog) {
242
243
    reportPacker.m_watchdog->addParameter(log::Param("status",
      ENOSPC == m_errorCode?"success":"failure"));
244
245
246
247
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
248
  }
249
  reportPacker.m_continue=false;
250
}
251
252
253
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
254
255
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPacker){
  reportPacker.m_errorHappened=true;
256
257
  reportPacker.m_lc.log(LOG_ERR,m_ex.getMessageValue());
  m_failedArchiveJob->failed(cta::exception::Exception(m_ex.getMessageValue()));
258
}
259

260
261
262
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
263
264
265
MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent):
m_parent(parent) {
}
266
267
268
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
269
void MigrationReportPacker::WorkerThread::run(){
270
  m_parent.m_lc.pushOrReplace(log::Param("thread", "ReportPacker"));
271
272
  try{
    while(m_parent.m_continue) {
273
      std::unique_ptr<Report> rep (m_parent.m_fifo.pop());
274
275
276
277
278
279
      try{
        rep->execute(m_parent);
      }
      catch(const failedMigrationRecallResult& e){
        //here we catch a failed report MigrationResult. We try to close and it that fails too
        //we end up in the catch below
280
        m_parent.m_archiveMount->complete();
Daniele Kruse's avatar
Daniele Kruse committed
281
        m_parent.m_lc.log(LOG_INFO,"Successfully closed client's session after the failed report MigrationResult");
282
283
284
285
        if (m_parent.m_watchdog) {
          m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
          m_parent.m_watchdog->addParameter(log::Param("status","failure"));
        }
286
        break;
287
288
289
      }
    }
  }
290
  catch(const castor::exception::Exception& e){
291
292
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
Daniele Kruse's avatar
Daniele Kruse committed
293
    m_parent.m_lc.log(LOG_ERR,"tried to report endOfSession(WithError) and got an exception, cant do much more");
294
295
296
297
    if (m_parent.m_watchdog) {
      m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
      m_parent.m_watchdog->addParameter(log::Param("status","failure"));
    }
298
299
300
301
  }
}

}}}}