MountSession.cpp 15.4 KB
Newer Older
Eric Cano's avatar
Eric Cano committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *                      MountSession.cpp
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

25
26
#include <memory>

Eric Cano's avatar
Eric Cano committed
27
28
29
#include "MountSession.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
30
#include "castor/tape/tapeserver/client/ClientProxy.hpp"
Eric Cano's avatar
Eric Cano committed
31
32
#include "log.h"
#include "stager_client_commandline.h"
33
34
35
#include "castor/tape/utils/utils.hpp"
#include "castor/System.hpp"
#include "h/serrno.h"
36
#include "castor/tape/tapeserver/SCSI/Device.hpp"
37
#include "castor/tape/tapeserver/drive/Drive.hpp"
38
#include "RecallTaskInjector.hpp"
39
#include "RecallReportPacker.hpp"
Eric Cano's avatar
Eric Cano committed
40

41
using namespace castor::tape;
Eric Cano's avatar
Eric Cano committed
42
43
using namespace castor::log;

44
castor::tape::tapeserver::daemon::MountSession::MountSession(
Eric Cano's avatar
Eric Cano committed
45
    const legacymsg::RtcpJobRqstMsgBody& clientRequest, 
46
    castor::log::Logger& logger, System::virtualWrapper & sysWrapper,
47
48
    const utils::TpconfigLines & tpConfig,
    const CastorConf & castorConf): 
49
    m_request(clientRequest), m_logger(logger), m_clientProxy(clientRequest),
50
    m_sysWrapper(sysWrapper), m_tpConfig(tpConfig), m_castorConf(castorConf) {}
Eric Cano's avatar
Eric Cano committed
51

52
void castor::tape::tapeserver::daemon::MountSession::execute()
Eric Cano's avatar
Eric Cano committed
53
54
55
throw (castor::tape::Exception) {
  // 1) Prepare the logging environment
  LogContext lc(m_logger);
56
57
  // Create a sticky thread name, which will be overridden by the other threads
  lc.pushOrReplace(Param("thread", "mainThread"));
Eric Cano's avatar
Eric Cano committed
58
59
60
61
62
63
  LogContext::ScopedParam sp01(lc, Param("clientHost", m_request.clientHost));
  LogContext::ScopedParam sp02(lc, Param("clientPort", m_request.clientPort));
  LogContext::ScopedParam sp03(lc, Param("mountTransactionId", m_request.volReqId));
  LogContext::ScopedParam sp04(lc, Param("volReqId", m_request.volReqId));
  LogContext::ScopedParam sp05(lc, Param("driveUnit", m_request.driveUnit));
  LogContext::ScopedParam sp06(lc, Param("dgn", m_request.dgn));
64
  // 2a) Get initial information from the client
65
  client::ClientProxy::RequestReport reqReport;
Eric Cano's avatar
Eric Cano committed
66
  try {
67
    m_clientProxy.fetchVolumeId(m_volInfo, reqReport);
68
  } catch(client::ClientProxy::EndOfSession & eof) {
69
70
71
    std::stringstream fullError;
    fullError << "Received end of session from client when requesting Volume"
      << eof.what();
72
    lc.log(LOG_ERR, fullError.str());
73
    m_clientProxy.reportEndOfSession(reqReport);
Eric Cano's avatar
Eric Cano committed
74
75
76
    LogContext::ScopedParam sp07(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp08(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp09(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
77
78
79
    LogContext::ScopedParam sp10(lc, Param("ErrorMsg", fullError.str()));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
80
  } catch (client::ClientProxy::UnexpectedResponse & unexp) {
81
82
83
    std::stringstream fullError;
    fullError << "Received unexpected response from client when requesting Volume"
      << unexp.what();
Eric Cano's avatar
Eric Cano committed
84
    lc.log(LOG_ERR, fullError.str());
85
    m_clientProxy.reportEndOfSession(reqReport);
Eric Cano's avatar
Eric Cano committed
86
87
88
89
90
91
92
    LogContext::ScopedParam sp07(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp08(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp09(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp10(lc, Param("ErrorMsg", fullError.str()));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
  }
93
  // 2b) ... and log.
Eric Cano's avatar
Eric Cano committed
94
  // Make the TPVID parameter permanent.
95
96
97
98
99
100
101
102
103
104
105
106
  LogContext::ScopedParam sp07(lc, Param("TPVID", m_request.dgn));
  {
    LogContext::ScopedParam sp08(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp09(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp00(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp11(lc, Param("TPVID", m_volInfo.vid));
    LogContext::ScopedParam sp12(lc, Param("density", m_volInfo.density));
    LogContext::ScopedParam sp13(lc, Param("label", m_volInfo.labelObsolete));
    LogContext::ScopedParam sp14(lc, Param("clientType", utils::volumeClientTypeToString(m_volInfo.clientType)));
    LogContext::ScopedParam sp15(lc, Param("mode", utils::volumeModeToString(m_volInfo.volumeMode)));
    lc.log(LOG_INFO, "Got volume from client");
  }
Eric Cano's avatar
Eric Cano committed
107
108
  
  // Depending on the type of session, branch into the right execution
109
110
111
112
113
114
115
116
117
118
119
120
121
  switch(m_volInfo.volumeMode) {
  case tapegateway::READ:
    executeRead(lc);
    return;
  case tapegateway::WRITE:
    executeWrite(lc);
    return;
  case tapegateway::DUMP:
    executeDump(lc);
    return;
  }
}

122
void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc) {
123
124
125
  // We are ready to start the session. In case of read there is no interest in
  // creating the machinery before getting the tape mounted, so do it now.
  // 1) Get hold of the drive and check it.
126
  utils::TpconfigLines::const_iterator configLine;
127
128
  for (configLine = m_tpConfig.begin(); configLine != m_tpConfig.end(); configLine++) {
    if (configLine->unitName == m_request.driveUnit && configLine->density == m_volInfo.density) {
129
130
131
132
      break;
    }
  }
  // If we did not find the drive in the tpConfig, we have a problem
133
  if (configLine == m_tpConfig.end()) {
134
    LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
135
    lc.log(LOG_ERR, "Drive unit not found in TPCONFIG");
136
    
137
    client::ClientProxy::RequestReport reqReport;
138
139
    std::stringstream errMsg;
    errMsg << "Drive unit not found in TPCONFIG" << lc;
140
    m_clientProxy.reportEndOfSessionWithError("Drive unit not found", SEINTERNAL, reqReport);
141
142
143
144
145
146
147
148
    LogContext::ScopedParam sp09(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp10(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp11(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp12(lc, Param("errorMessage", errMsg.str()));
    LogContext::ScopedParam sp13(lc, Param("errorCode", SEINTERNAL));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
  }
149
150
  // Actually find the drive.
  castor::tape::SCSI::DeviceVector dv(m_sysWrapper);
151
152
153
154
155
156
157
158
159
  castor::tape::SCSI::DeviceInfo driveInfo;
  try {
    driveInfo = dv.findBySymlink(configLine->devFilename);
  } catch (castor::tape::SCSI::DeviceVector::NotFound & e) {
    // We could not find this drive in the system's SCSI devices
    LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
    LogContext::ScopedParam sp09(lc, Param("devFilename", configLine->devFilename));
    lc.log(LOG_ERR, "Drive not found on this path");
    
160
    client::ClientProxy::RequestReport reqReport;
161
162
    std::stringstream errMsg;
    errMsg << "Drive not found on this path" << lc;
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
    m_clientProxy.reportEndOfSessionWithError("Drive unit not found", SEINTERNAL, reqReport);
    LogContext::ScopedParam sp10(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp11(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp12(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
    LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
  } catch (castor::exception::Exception & e) {
    // We could not find this drive in the system's SCSI devices
    LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
    LogContext::ScopedParam sp09(lc, Param("devFilename", configLine->devFilename));
    LogContext::ScopedParam sp10(lc, Param("errorMessage", e.getMessageValue()));
    lc.log(LOG_ERR, "Error looking to path to tape drive");
    
178
    client::ClientProxy::RequestReport reqReport;
179
180
    std::stringstream errMsg;
    errMsg << "Error looking to path to tape drive: " << lc;
181
182
183
184
185
186
187
188
189
190
191
192
193
194
    m_clientProxy.reportEndOfSessionWithError("Drive unit not found", SEINTERNAL, reqReport);
    LogContext::ScopedParam sp11(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp12(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp13(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp14(lc, Param("errorMessage", errMsg.str()));
    LogContext::ScopedParam sp15(lc, Param("errorCode", SEINTERNAL));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
  } catch (...) {
    // We could not find this drive in the system's SCSI devices
    LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
    LogContext::ScopedParam sp09(lc, Param("devFilename", configLine->devFilename));
    lc.log(LOG_ERR, "Unexpected exception while looking for drive");
    
195
    client::ClientProxy::RequestReport reqReport;
196
197
    std::stringstream errMsg;
    errMsg << "Unexpected exception while looking for drive" << lc;
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
    m_clientProxy.reportEndOfSessionWithError("Drive unit not found", SEINTERNAL, reqReport);
    LogContext::ScopedParam sp10(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp11(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp12(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
    LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
  }
  std::auto_ptr<castor::tape::drives::DriveInterface> drive;
  try {
    drive.reset(castor::tape::drives::DriveFactory(driveInfo, m_sysWrapper));
  } catch (castor::exception::Exception & e) {
    // We could not find this drive in the system's SCSI devices
    LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
    LogContext::ScopedParam sp09(lc, Param("devFilename", configLine->devFilename));
    LogContext::ScopedParam sp10(lc, Param("errorMessage", e.getMessageValue()));
    lc.log(LOG_ERR, "Error opening tape drive");
    
217
    client::ClientProxy::RequestReport reqReport;
218
219
    std::stringstream errMsg;
    errMsg << "Error opening tape drive" << lc;
220
221
222
223
224
225
226
227
228
229
230
231
232
233
    m_clientProxy.reportEndOfSessionWithError("Drive unit not found", SEINTERNAL, reqReport);
    LogContext::ScopedParam sp11(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp12(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp13(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp14(lc, Param("errorMessage", errMsg.str()));
    LogContext::ScopedParam sp15(lc, Param("errorCode", SEINTERNAL));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
  } catch (...) {
    // We could not find this drive in the system's SCSI devices
    LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
    LogContext::ScopedParam sp09(lc, Param("devFilename", configLine->devFilename));
    lc.log(LOG_ERR, "Unexpected exception while opening drive");
    
234
    client::ClientProxy::RequestReport reqReport;
235
236
    std::stringstream errMsg;
    errMsg << "Unexpected exception while opening drive" << lc;
237
238
239
240
241
242
243
244
245
    m_clientProxy.reportEndOfSessionWithError("Drive unit not found", SEINTERNAL, reqReport);
    LogContext::ScopedParam sp10(lc, Param("tapebridgeTransId", reqReport.transactionId));
    LogContext::ScopedParam sp11(lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp12(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
    LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
    lc.log(LOG_ERR, "Notified client of end session with error");
    return;
  }
246
247
  // We can now start instantiating all the components of the data path
  {
248
249
    // Allocate all the elements of the memory management (in proper order
    // to refer them to each other)
250
    RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz);
251
252
    TapeReadSingleThread trst(*drive, m_volInfo.vid, 
        m_castorConf.tapebridgeBulkRequestRecallMaxFiles, lc);
253
254
255
    RecallReportPacker rrp(m_clientProxy,
        m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
        lc);
256
257
    DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
        m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
258
259
260
        m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
        rrp,
        lc);
261
    RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy, lc);
262
    trst.setTaskInjector(&rti);
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
    
    // We are now ready to put everything in motion. First step is to check
    // we get any concrete job to be done from the client (via the task injector)
    if (rti.synchronousInjection(m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
        m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
      // We got something to recall. Time to start the machinery
      trst.startThreads();
      dwtp.startThreads();
      rrp.startThreads();
      rti.startThreads();
      // This thread is now going to be idle until the system unwinds at the end 
      // of the session
      // All client notifications are done by the report packer, including the
      // end of session
      rti.waitThreads();
      rrp.waitThread();
      dwtp.waitThreads();
      trst.waitThreads();
    } else {
      // Just log this was an empty mount and that's it. The memory management
      // will be deallocated automatically.
      lc.log(LOG_ERR, "Aborting recall mount startup: empty mount");
285
      client::ClientProxy::RequestReport reqReport;
286
287
288
289
290
291
292
293
      m_clientProxy.reportEndOfSessionWithError("Aborted: empty recall mount", SEINTERNAL, reqReport);
      LogContext::ScopedParam sp08(lc, Param("tapebridgeTransId", reqReport.transactionId));
      LogContext::ScopedParam sp09(lc, Param("connectDuration", reqReport.connectDuration));
      LogContext::ScopedParam sp10(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
      LogContext::ScopedParam sp11(lc, Param("errorMessage", "Aborted: empty recall mount"));
      LogContext::ScopedParam sp12(lc, Param("errorCode", SEINTERNAL));
      lc.log(LOG_ERR, "Notified client of end session with error");
    }
294
  }
295
296
}

297
void castor::tape::tapeserver::daemon::MountSession::executeWrite(LogContext & lc) {
298
299
}

300
void castor::tape::tapeserver::daemon::MountSession::executeDump(LogContext & lc) {
301
302
303
304
  // We are ready to start the session. In case of read there is no interest in
  // creating the machinery before getting the tape mounted, so do it now.
  // 1) Get hold of the drive and check it.
  
Eric Cano's avatar
Eric Cano committed
305
}