ClientProxy.cpp 15.2 KB
Newer Older
1
/******************************************************************************
2
 *                      ClientProxy.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

25
#include "ClientProxy.hpp"
26
27
28
29
30
31
32
#include "castor/io/ClientSocket.hpp"
#include "castor/tape/tapegateway/VolumeRequest.hpp"
#include "castor/tape/tapegateway/Volume.hpp"
#include "castor/tape/tapegateway/NoMoreFiles.hpp"
#include "castor/tape/tapegateway/EndNotificationErrorReport.hpp"
#include "castor/tape/tapegateway/EndNotification.hpp"
#include "castor/tape/tapegateway/NotificationAcknowledge.hpp"
33
34
35
36
37
38
#include "castor/tape/tapegateway/FilesToMigrateListRequest.hpp"
#include "castor/tape/tapegateway/FilesToMigrateList.hpp"
#include "castor/tape/tapegateway/FileMigrationReportList.hpp"
#include "castor/tape/tapegateway/FilesToRecallListRequest.hpp"
#include "castor/tape/tapegateway/FilesToRecallList.hpp"
#include "castor/tape/tapegateway/FileRecallReportList.hpp"
Eric Cano's avatar
Eric Cano committed
39
#include "castor/tape/utils/Timer.hpp"
40
#include "castor/tape/utils/utils.hpp"
41
42
43
44
45
#include <cxxabi.h>
#include <memory>
#include <stdlib.h>
#include <typeinfo>

46
47
48
49
50
51
namespace castor {
namespace tape {
namespace tapeserver {
namespace client {
  

52
ClientProxy::ClientProxy(const legacymsg::RtcpJobRqstMsgBody& clientRequest):
53
m_request(clientRequest),m_transactionId(0) {}
54

55
ClientProxy::UnexpectedResponse::
56
57
58
59
60
61
62
63
64
65
66
67
    UnexpectedResponse(const castor::IObject* resp, const std::string & w):
castor::tape::Exception(w) {
  std::string responseType = typeid(*resp).name();
  int status = -1;
  char * demangled = abi::__cxa_demangle(responseType.c_str(), NULL, NULL, &status);
  if (!status)
    responseType = demangled;
  free(demangled);
  getMessage() << " Response type was: " << responseType;
}

tapegateway::GatewayMessage *
68
  ClientProxy::requestResponseSession(
Eric Cano's avatar
Eric Cano committed
69
70
    const tapegateway::GatewayMessage &req,
    RequestReport & report)
71
{
Eric Cano's avatar
Eric Cano committed
72
73
  // 0) Start the stopwatch
  castor::tape::utils::Timer timer;
74
75
76
77
  // 1) We re-open connection to client for each request-response exchange
  castor::io::ClientSocket clientConnection(m_request.clientPort, 
      m_request.clientHost);
  clientConnection.connect();
Eric Cano's avatar
Eric Cano committed
78
  report.connectDuration = timer.secs();
79
80
  // 2) The actual exchange over the network.
  clientConnection.sendObject(const_cast<tapegateway::GatewayMessage &>(req));
Eric Cano's avatar
Eric Cano committed
81
82
  std::auto_ptr<castor::IObject> resp (clientConnection.readObject());
  report.sendRecvDuration = timer.secs();
83
84
85
86
87
88
89
90
91
92
93
  // 3) Check response type
  tapegateway::GatewayMessage * ret = 
      dynamic_cast<tapegateway::GatewayMessage*>(resp.get());
  if (NULL == ret) {
    throw UnexpectedResponse(resp.get(), 
        "In castor::tape::server::clientInterface::requestResponseSession, "
        "expected a tapegateway::GatewayMessage response. ");
  }
  // 4) Check we get a response for the request we sent (sanity check)
  if ((ret->mountTransactionId() != m_request.volReqId) ||
      ret->aggregatorTransactionId() != req.aggregatorTransactionId()) {
94
95
96
97
98
99
100
101
102
103
104
    std::stringstream mess;
    if (ret->mountTransactionId() != m_request.volReqId) {
    mess << "In castor::tape::server::clientInterface::requestResponseSession, "
        "expected a information about mountSessionId=" << m_request.volReqId
        << " and received: " << ret->mountTransactionId();
    } else {
    mess << "In castor::tape::server::clientInterface::requestResponseSession, "
        "expected a response for tapebridgeTransId=" << req.aggregatorTransactionId()
        << " and received for: " << ret->aggregatorTransactionId();
    }
    throw UnexpectedResponse(resp.get(), mess.str());
105
106
107
108
109
110
  }
  // Slightly ugly sequence in order to not duplicate the dynamic_cast.
  resp.release();
  return ret;
}

111
void ClientProxy::fetchVolumeId(
Eric Cano's avatar
Eric Cano committed
112
  VolumeInfo & volInfo, RequestReport &report) 
113
{
114
115
116
117
118
119
  // 1) Build the request
  castor::tape::tapegateway::VolumeRequest request;
  request.setMountTransactionId(m_request.volReqId);
  // The transaction id is auto-incremented (through the cast operator) 
  // so we just need to use it (and save the value locally if we need to reuse 
  // it)
120
  report.transactionId = ++m_transactionId;
Eric Cano's avatar
Eric Cano committed
121
  request.setAggregatorTransactionId(report.transactionId);
122
123
124
  request.setUnit(m_request.driveUnit);
  // 2) get the reply from the client
  std::auto_ptr<tapegateway::GatewayMessage> 
Eric Cano's avatar
Eric Cano committed
125
      response (requestResponseSession(request, report));
126
127
128
129
130
131
  // 3) Process the possible outputs
  tapegateway::Volume * volume;
  tapegateway::NoMoreFiles * noMore;
  tapegateway::EndNotificationErrorReport * errorReport;
  // 3) a) Volume: this is the good day answer
  if (NULL != (volume = dynamic_cast<tapegateway::Volume *>(response.get()))) {
Eric Cano's avatar
Eric Cano committed
132
133
134
135
136
    volInfo.vid = volume->vid();
    volInfo.clientType = volume->clientType();
    volInfo.density = volume->density();
    volInfo.labelObsolete = volume->label();
    volInfo.volumeMode = volume->mode();
137
138
139
140
141
142
143
144
145
  // 3) b) Straight noMoreFiles answer: at least we know.
  } else if (NULL != (noMore = dynamic_cast<tapegateway::NoMoreFiles *>(response.get()))) {
    throw EndOfSession("Client replied noMoreFiles directly to volume request");
  // 3) c) End notification error report.
  } else if (NULL != (errorReport = dynamic_cast<tapegateway::EndNotificationErrorReport *>(
      response.get()))) {
    EndOfSession eos("Client replied noMoreFiles directly to volume request: ");
        eos.getMessage() << "errorCode=" << errorReport->errorCode()
        << "errorReport=\"" <<  errorReport->errorMessage() << "\"";
146
        throw eos;
147
148
149
150
151
152
153
  // Unexpected response type  
  } else {
    throw UnexpectedResponse(response.get(), "Unexpected response from client in response "
        "to a volume request");
  }
}

154
void ClientProxy::reportEndOfSession(
Eric Cano's avatar
Eric Cano committed
155
RequestReport &transactionReport) 
156
{
157
  // 1) Build the report
Eric Cano's avatar
Eric Cano committed
158
  castor::tape::tapegateway::EndNotification endReport;
159
  transactionReport.transactionId = ++m_transactionId;
Eric Cano's avatar
Eric Cano committed
160
  endReport.setMountTransactionId(m_request.volReqId);
161
  endReport.setAggregatorTransactionId(transactionReport.transactionId);
162
163
164
165
166
167
168
  // 2) Send the report
  std::auto_ptr<tapegateway::GatewayMessage> ack(
      requestResponseSession(endReport, transactionReport));
  // 3) If we did not get a ack, complain (not much more we can do)
  // We could use the castor typing here, but we stick to case for homogeneity
  // of the code.
  try {
Eric Cano's avatar
Eric Cano committed
169
170
171
172
173
174
175
    // Here we are only interested by the fact that we received a 
    // notificationAcknowledge. The matching of the transactionId has already
    // been checked automatically by requestResponseSession.
    // The dynamic cast to reference will conveniently throw an exception
    // it we did not get the acknowledgement. We cast it to void to silence
    // some compilers (at least clang) which complain that the return value 
    // of the cast is not used.
Eric Cano's avatar
Eric Cano committed
176
    (void)dynamic_cast<tapegateway::NotificationAcknowledge &>(*ack.get());
177
178
179
180
181
182
183
  } catch (std::bad_cast) {
    throw UnexpectedResponse(ack.get(), 
        "Unexpected response when reporting end of session");
  }
}


184
void ClientProxy::reportEndOfSessionWithError(
185
const std::string & errorMsg, int errorCode, RequestReport &transactionReport) 
186
{
187
188
  // 1) Build the report
  castor::tape::tapegateway::EndNotificationErrorReport endReport;
189
  transactionReport.transactionId = ++m_transactionId;
190
  endReport.setMountTransactionId(m_request.volReqId);
191
  endReport.setAggregatorTransactionId(transactionReport.transactionId);
192
193
  endReport.setErrorMessage(errorMsg);
  endReport.setErrorCode(errorCode);
194
195
  // 2) Send the report
  std::auto_ptr<tapegateway::GatewayMessage> ack(
Eric Cano's avatar
Eric Cano committed
196
      requestResponseSession(endReport, transactionReport));
197
198
199
200
  // 3) If we did not get a ack, complain (not much more we can do)
  // We could use the castor typing here, but we stick to case for homogeneity
  // of the code.
  try {
Eric Cano's avatar
Eric Cano committed
201
202
203
204
205
206
207
    // Here we are only interested by the fact that we received a 
    // notificationAcknowledge. The matching of the transactionId has already
    // been checked automatically by requestResponseSession.
    // The dynamic cast to reference will conveniently throw an exception
    // it we did not get the acknowledgement. We cast it to void to silence
    // some compilers (at least clang) which complain that the return value 
    // of the cast is not used.
Eric Cano's avatar
Eric Cano committed
208
    (void)dynamic_cast<tapegateway::NotificationAcknowledge &>(*ack.get());
209
210
211
212
  } catch (std::bad_cast) {
    throw UnexpectedResponse(ack.get(), 
        "Unexpected response when reporting end of session");
  }
213
}
214
215


216
tapegateway::FilesToMigrateList * 
217
    ClientProxy::getFilesToMigrate(
218
uint64_t files, uint64_t bytes, RequestReport& report) 
219
{
220
221
222
223
224
  // 1) Build the request
  castor::tape::tapegateway::FilesToMigrateListRequest ftmReq;
  report.transactionId = ++m_transactionId;
  ftmReq.setMountTransactionId(m_request.volReqId);
  ftmReq.setAggregatorTransactionId(report.transactionId);
225
  ftmReq.setMaxFiles(files);
226
227
228
229
  ftmReq.setMaxBytes(bytes);
  // 2) Exchange messages with the server
  std::auto_ptr<tapegateway::GatewayMessage> resp(
      requestResponseSession(ftmReq, report));
230
231
  // 3) We expect either a NoMoreFiles response or FilesToMigrateList
  // 3a) Handle the FilesToMigrateList
232
233
234
235
236
237
238
239
240
241
242
243
  try {
    tapegateway::FilesToMigrateList & ftm  =
        dynamic_cast <tapegateway::FilesToMigrateList &>(*resp);
    if (ftm.filesToMigrate().size()) {
      resp.release();
      return &ftm;
    } else {
      return NULL;
    }
  } catch (std::bad_cast) {}
  // 3b) Try again with NoMoreFiles (and this time failure is fatal)
  try {
Eric Cano's avatar
Eric Cano committed
244
245
246
    // As in reportEndOfSession, we are only interested in receiving a 
    // NoMoreFiles message. (void) for picky compilers
    (void)dynamic_cast<tapegateway::NoMoreFiles &>(*resp);
247
248
249
250
251
    return NULL;
  } catch (std::bad_cast) {
    throw UnexpectedResponse(resp.get(),
        "Unexpected response to FilesToMigrateListRequest in getFilesToMigrate");
  }
252
253
}

254
void ClientProxy::reportMigrationResults(
Eric Cano's avatar
Eric Cano committed
255
tapegateway::FileMigrationReportList & migrationReport,
256
    RequestReport& report) {
Eric Cano's avatar
Eric Cano committed
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
  // 1) The request is provided already fleshed out by the user. We have to
  // add the administrative numbering
  migrationReport.setMountTransactionId(m_request.volReqId);
  report.transactionId = ++m_transactionId;
  migrationReport.setAggregatorTransactionId(report.transactionId);
  // The next 2 parameters are currently set to hardcoded defaults (as were in
  // the tape bridge). They were created in prevision of an evolution where
  // mode responsibility of central servers updates was to be pushed to the 
  // tape server.
  migrationReport.setFseqSet(false);
  migrationReport.setFseq(0);
  // 2) Exchange messages with the server
  std::auto_ptr<tapegateway::GatewayMessage> resp(
      requestResponseSession(migrationReport, report));
  // 3) We expect 2 types of return messages: NotificationAcknowledge and
  // EndNotificationErrorReport.
  // 3a) Handle the NotificationAcknowledge
  try {
Eric Cano's avatar
Eric Cano committed
275
276
277
    // As in reportEndOfSession, we are only interested in receiving a 
    // NotificationAcknowledge message. (void) for picky compilers
    (void)dynamic_cast<tapegateway::NotificationAcknowledge &>(*resp);
Eric Cano's avatar
Eric Cano committed
278
279
280
281
282
283
284
    return;
  } catch (std::bad_cast) {}
  // 3b) Handle the end notification error report and turn it into a bare
  // tape::exception
  try {
    tapegateway::EndNotificationErrorReport & err = 
        dynamic_cast<tapegateway::EndNotificationErrorReport &> (*resp);
285
286
287
    std::stringstream mess;
    mess << "End notification report: errorMessage=\""
        << err.errorMessage() << "\" errorCode=" << err.errorCode();
Eric Cano's avatar
Eric Cano committed
288
289
290
291
292
    throw castor::tape::Exception(mess.str());
  } catch (std::bad_cast) {
    throw UnexpectedResponse(resp.get(),
        "Unexpected response to FileMigrationReportList in reportMigrationResults");
  }
293
294
}

Eric Cano's avatar
Eric Cano committed
295
tapegateway::FilesToRecallList * 
296
    ClientProxy::getFilesToRecall(
297
uint64_t files, uint64_t bytes, RequestReport& report) 
298
{
Eric Cano's avatar
Eric Cano committed
299
300
301
302
303
  // 1) Build the request
  castor::tape::tapegateway::FilesToRecallListRequest ftrReq;
  report.transactionId = ++m_transactionId;
  ftrReq.setMountTransactionId(m_request.volReqId);
  ftrReq.setAggregatorTransactionId(report.transactionId);
304
  ftrReq.setMaxFiles(files);
Eric Cano's avatar
Eric Cano committed
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
  ftrReq.setMaxBytes(bytes);
  // 2) Exchange messages with the server
  std::auto_ptr<tapegateway::GatewayMessage> resp(
      requestResponseSession(ftrReq, report));
  // 3) We expect either a NoMoreFiles response or FilesToRecallList
  // 3a) Handle the FilesToRecallListl
  try {
    tapegateway::FilesToRecallList & ftr  =
        dynamic_cast <tapegateway::FilesToRecallList &>(*resp);
    if (ftr.filesToRecall().size()) {
      resp.release();
      return &ftr;
    } else {
      return NULL;
    }
  } catch (std::bad_cast) {}
  // 3b) Try again with NoMoreFiles (and this time failure is fatal)
  try {
Eric Cano's avatar
Eric Cano committed
323
324
325
    // As in reportEndOfSession, we are only interested in receiving a 
    // NoMoreFiles message. (void) for picky compilers
    (void)dynamic_cast<tapegateway::NoMoreFiles &>(*resp);
Eric Cano's avatar
Eric Cano committed
326
327
328
329
330
    return NULL;
  } catch (std::bad_cast) {
    throw UnexpectedResponse(resp.get(),
        "Unexpected response to FilesToRecallListRequest in getFilesToRecall");
  }
331
332
}

333
void ClientProxy::reportRecallResults(
Eric Cano's avatar
Eric Cano committed
334
tapegateway::FileRecallReportList & recallReport,
335
RequestReport& report) {
Eric Cano's avatar
Eric Cano committed
336
337
338
339
340
341
342
343
344
345
346
347
  // 1) The request is provided already fleshed out by the user. We have to
  // add the administrative numbering
  recallReport.setMountTransactionId(m_request.volReqId);
  report.transactionId = ++m_transactionId;
  recallReport.setAggregatorTransactionId(report.transactionId);
  // 2) Exchange messages with the server
  std::auto_ptr<tapegateway::GatewayMessage> resp(
      requestResponseSession(recallReport, report));
  // 3) We expect 2 types of return messages: NotificationAcknowledge and
  // EndNotificationErrorReport.
  // 3a) Handle the NotificationAcknowledge
  try {
Eric Cano's avatar
Eric Cano committed
348
349
350
    // As in reportEndOfSession, we are only interested in receiving a 
    // NotificationAcknowledge message. (void) for picky compilers
    (void)dynamic_cast<tapegateway::NotificationAcknowledge &>(*resp);
Eric Cano's avatar
Eric Cano committed
351
352
353
354
355
356
357
    return;
  } catch (std::bad_cast) {}
  // 3b) Handle the end notification error report and turn it into a bare
  // tape::exception
  try {
    tapegateway::EndNotificationErrorReport & err = 
        dynamic_cast<tapegateway::EndNotificationErrorReport &> (*resp);
358
359
360
    std::stringstream mess;
    mess << "End notification report: errorMessage=\""
        << err.errorMessage() << "\" errorCode=" << err.errorCode();
Eric Cano's avatar
Eric Cano committed
361
362
363
364
365
    throw castor::tape::Exception(mess.str());
  } catch (std::bad_cast) {
    throw UnexpectedResponse(resp.get(),
        "Unexpected response to FileRecallReportList in reportRecallResults");
  }
366
}
367
368

}}}}