VdqmProxyTcpIp.cpp 14.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
19
 * @author Castor Dev team, castor-dev@cern.ch
20
21
22
 *****************************************************************************/

#include "castor/io/io.hpp"
23
24
25
#include "castor/legacymsg/CommonMarshal.hpp"
#include "castor/legacymsg/RtcpMarshal.hpp"
#include "castor/legacymsg/VdqmMarshal.hpp"
26
#include "castor/legacymsg/VdqmProxyTcpIp.hpp"
27
28
#include "castor/utils/SmartFd.hpp"
#include "castor/utils/utils.hpp"
29
30
31
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"

32
33
34
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
35
36
37
castor::legacymsg::VdqmProxyTcpIp::VdqmProxyTcpIp(log::Logger &log,
  const std::string &vdqmHostName, const unsigned short vdqmPort,
  const int netTimeout) throw():
38
39
40
41
  m_log(log),
  m_vdqmHostName(vdqmHostName),
  m_vdqmPort(vdqmPort),
  m_netTimeout(netTimeout) {
42
43
}

44
45
46
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
47
castor::legacymsg::VdqmProxyTcpIp::~VdqmProxyTcpIp() throw() {
48
49
}

50
//------------------------------------------------------------------------------
51
// setDriveDown
52
//------------------------------------------------------------------------------
53
54
void castor::legacymsg::VdqmProxyTcpIp::setDriveDown(const std::string &server,
  const std::string &unitName, const std::string &dgn)  {
55
  try {
56
57
    legacymsg::VdqmDrvRqstMsgBody body;
    body.status = VDQM_UNIT_DOWN;
58
59
60
    castor::utils::copyString(body.server, server);
    castor::utils::copyString(body.drive, unitName);
    castor::utils::copyString(body.dgn, dgn);
61

62
    setDriveStatus(body);
63
64
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
65
66
67
    ex.getMessage() << "Failed to set state of tape drive"
      ": server=" << server << " unitName=" << unitName << " dgn=" << dgn <<
      " state=down: " << ne.getMessage().str();
68
69
    throw ex;
  }
70
71
72
}

//------------------------------------------------------------------------------
73
// setDriveUp
74
//------------------------------------------------------------------------------
75
76
void castor::legacymsg::VdqmProxyTcpIp::setDriveUp(const std::string &server,
  const std::string &unitName, const std::string &dgn)  {
77
  try {
78
    legacymsg::VdqmDrvRqstMsgBody body;
79
    body.status = VDQM_UNIT_UP;
80
81
82
    castor::utils::copyString(body.server, server);
    castor::utils::copyString(body.drive, unitName);
    castor::utils::copyString(body.dgn, dgn);
83

84
    setDriveStatus(body);
85
86
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
87
88
89
    ex.getMessage() << "Failed to set state of tape drive"
      ": server=" << server << " unitName=" << unitName << " dgn=" << dgn <<
      " state=up: " << ne.getMessage().str();
90
91
    throw ex;
  }
92
93
}

94
//------------------------------------------------------------------------------
95
// assignDrive
96
//------------------------------------------------------------------------------
97
98
99
void castor::legacymsg::VdqmProxyTcpIp::assignDrive(const std::string &server,
  const std::string &unitName, const std::string &dgn,
  const uint32_t mountTransactionId, const pid_t sessionPid)  {
100
101
102
103
  try {
    legacymsg::VdqmDrvRqstMsgBody body;
    body.status = VDQM_UNIT_ASSIGN;
    body.volReqId = mountTransactionId;
104
    body.jobId = sessionPid;
105
106
107
    castor::utils::copyString(body.server, server);
    castor::utils::copyString(body.drive, unitName);
    castor::utils::copyString(body.dgn, dgn);
108

109
    setDriveStatus(body);
110
111
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
112
113
114
115
    ex.getMessage() << "Failed to assign drive"
      ": server=" << server << " unitName=" << unitName << " dgn=" << dgn <<
      " mountTransactionId=" << mountTransactionId << " sessionPid=" <<
      sessionPid << ": " << ne.getMessage().str();
116
117
118
119
    throw ex;
  }
}

120
//------------------------------------------------------------------------------
121
// tapeMounted
122
//------------------------------------------------------------------------------
123
124
125
126
void castor::legacymsg::VdqmProxyTcpIp::tapeMounted(const std::string &server,
  const std::string &unitName, const std::string &dgn, const std::string &vid,
  const pid_t sessionPid)  {

127
128
129
130
  try {
    legacymsg::VdqmDrvRqstMsgBody body;
    body.status = VDQM_VOL_MOUNT;
    body.jobId = sessionPid;
131
132
133
134
    castor::utils::copyString(body.volId, vid);
    castor::utils::copyString(body.server, server);
    castor::utils::copyString(body.drive, unitName);
    castor::utils::copyString(body.dgn, dgn);
135

136
    setDriveStatus(body);
137
138
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
139
140
141
142
    ex.getMessage() << "Failed to notify vdqm that tape is mounted"
      ": server=" << server << " unitName=" << unitName << " dgn=" << dgn <<
      " vid=" << vid << " sessionPid=" << sessionPid << ": " <<
      ne.getMessage().str();
143
144
    throw ex;
  }
145
146
}

147
//------------------------------------------------------------------------------
148
// releaseDrive
149
//------------------------------------------------------------------------------
150
151
152
153
void castor::legacymsg::VdqmProxyTcpIp::releaseDrive(const std::string &server,
  const std::string &unitName, const std::string &dgn, const bool forceUnmount,
  const pid_t sessionPid)  {

154
  int status = VDQM_UNIT_RELEASE;
155
156
157
158
159
  if(forceUnmount) {
    status |= VDQM_FORCE_UNMOUNT;
  }

  try {
160
161
    legacymsg::VdqmDrvRqstMsgBody body;
    body.status = status;
162
    body.jobId = sessionPid;
163
164
165
    castor::utils::copyString(body.server, server);
    castor::utils::copyString(body.drive, unitName);
    castor::utils::copyString(body.dgn, dgn);
166

167
    setDriveStatus(body);
168
169
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
170
171
172
173
    ex.getMessage() << "Failed to release tape drive"
      ": server=" << server << " unitName=" << unitName << " dgn=" << dgn <<
      " forceUnmount=" << forceUnmount << " sessionPid=" << sessionPid << ": "
      << ne.getMessage().str();
174
175
176
    throw ex;
  }
}
177

178
//------------------------------------------------------------------------------
179
// setDriveStatus
180
//------------------------------------------------------------------------------
181
182
void castor::legacymsg::VdqmProxyTcpIp::setDriveStatus(
  const legacymsg::VdqmDrvRqstMsgBody &body)  {
183
184
185
186
187
188
  castor::utils::SmartFd fd(connectToVdqm());
  writeDriveStatusMsg(fd.get(), body);
  readCommitAck(fd.get());
  const legacymsg::MessageHeader header = readDriveStatusMsgHeader(fd.get());
  readDriveStatusMsgBody(fd.get(), header.lenOrStatus);
  writeCommitAck(fd.get());
189
}
190
191
192
193

//-----------------------------------------------------------------------------
// connectToVdqm
//-----------------------------------------------------------------------------
194
int castor::legacymsg::VdqmProxyTcpIp::connectToVdqm() const  {
195
196
  castor::utils::SmartFd smartConnectSock;
  try {
197
    smartConnectSock.reset(io::connectWithTimeout(m_vdqmHostName, m_vdqmPort,
198
      m_netTimeout));
199
  } catch(castor::exception::Exception &ne) {
200
    castor::exception::Exception ex;
201
    ex.getMessage() << "Failed to connect to vdqm on host " << m_vdqmHostName
202
      << " port " << m_vdqmPort << ": " << ne.getMessage().str();
203
204
205
206
207
    throw ex;
  }

  return smartConnectSock.release();
}
208
209
210
211

//-----------------------------------------------------------------------------
// writeDriveStatusMsg
//-----------------------------------------------------------------------------
212
213
void castor::legacymsg::VdqmProxyTcpIp::writeDriveStatusMsg(const int fd,
  const legacymsg::VdqmDrvRqstMsgBody &body)  {
214
215
  char buf[VDQM_MSGBUFSIZ];
  const size_t len = legacymsg::marshal(buf, body);
216
217

  try {
218
    io::writeBytes(fd, m_netTimeout, len, buf);
219
  } catch(castor::exception::Exception &ne) {
220
    castor::exception::Exception ex;
221
222
223
224
225
226
227
228
229
    ex.getMessage() << "Failed to write drive status message: "
      << ne.getMessage().str();
    throw ex;
  }
}

//-----------------------------------------------------------------------------
// readCommitAck
//-----------------------------------------------------------------------------
230
void castor::legacymsg::VdqmProxyTcpIp::readCommitAck(const int fd)  {
Steven Murray's avatar
Steven Murray committed
231
  legacymsg::MessageHeader ack;
232
233

  try {
234
    ack = readAck(fd);
235
  } catch(castor::exception::Exception &ne) {
236
    castor::exception::Exception ex;
Steven Murray's avatar
Steven Murray committed
237
238
    ex.getMessage() << "Failed to read VDQM_COMMIT ack: " <<
      ne.getMessage().str();
239
240
241
    throw ex;
  }

Steven Murray's avatar
Steven Murray committed
242
  if(VDQM_MAGIC != ack.magic) {
243
    castor::exception::Exception ex;
244
    ex.getMessage() << "Failed to read VDQM_COMMIT ack: Invalid magic"
Steven Murray's avatar
Steven Murray committed
245
      ": expected=0x" << std::hex << VDQM_MAGIC << " actual=" << ack.magic;
246
247
    throw ex;
  }
248
249
250
251
252
253
254
255

  if(VDQM_COMMIT == ack.reqType) {
    // Read a successful VDQM_COMMIT ack
    return;
  } else if(0 < ack.reqType) {
    // VDQM_COMMIT ack is reporting an error
    char errBuf[80];
    sstrerror_r(ack.reqType, errBuf, sizeof(errBuf));
256
    castor::exception::Exception ex;
257
258
259
260
    ex.getMessage() << "VDQM_COMMIT ack reported an error: " << errBuf;
    throw ex;
  } else {
    // VDQM_COMMIT ack contains an invalid request type
261
    castor::exception::Exception ex;
262
263
    ex.getMessage() << "VDQM_COMMIT ack contains an invalid request type"
      ": reqType=" << ack.reqType;
Steven Murray's avatar
Steven Murray committed
264
265
266
267
268
269
270
    throw ex;
  }
}

//-----------------------------------------------------------------------------
// readAck
//-----------------------------------------------------------------------------
271
272
castor::legacymsg::MessageHeader castor::legacymsg::VdqmProxyTcpIp::readAck(
  const int fd)  {
Steven Murray's avatar
Steven Murray committed
273
274
275
276
  char buf[12]; // Magic + type + len
  legacymsg::MessageHeader ack;

  try {
277
    io::readBytes(fd, m_netTimeout, sizeof(buf), buf);
Steven Murray's avatar
Steven Murray committed
278
  } catch(castor::exception::Exception &ne) {
279
    castor::exception::Exception ex;
Steven Murray's avatar
Steven Murray committed
280
    ex.getMessage() << "Failed to read ack: "
Steven Murray's avatar
Steven Murray committed
281
      << ne.getMessage().str();
282
283
    throw ex;
  }
Steven Murray's avatar
Steven Murray committed
284
285
286
287
288
289

  const char *bufPtr = buf;
  size_t bufLen = sizeof(buf);
  legacymsg::unmarshal(bufPtr, bufLen, ack);

  return ack;
290
291
292
293
294
}

//-----------------------------------------------------------------------------
// readDriveStatusMsgHeader
//-----------------------------------------------------------------------------
295
296
castor::legacymsg::MessageHeader castor::legacymsg::VdqmProxyTcpIp::
  readDriveStatusMsgHeader(const int fd) {
297
298
299
300
  char buf[12]; // Magic + type + len
  legacymsg::MessageHeader header;

  try {
301
    io::readBytes(fd, m_netTimeout, sizeof(buf), buf);
302
  } catch(castor::exception::Exception &ne) {
303
    castor::exception::Exception ex;
304
305
306
307
308
309
310
311
312
313
    ex.getMessage() << "Failed to read header of drive status message: "
      << ne.getMessage().str();
    throw ex;
  }

  const char *bufPtr = buf;
  size_t bufLen = sizeof(buf);
  legacymsg::unmarshal(bufPtr, bufLen, header);

  if(VDQM_MAGIC != header.magic) {
314
    castor::exception::Exception ex;
315
316
317
318
319
320
    ex.getMessage() << "Failed to read header of drive status message"
      ": Invalid magic: expected=0x" << std::hex << VDQM_MAGIC << " actual=0x"
      << header.magic;
    throw ex;
  }
  if(VDQM_DRV_REQ != header.reqType) {
321
    castor::exception::Exception ex;
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
    ex.getMessage() << "Failed to read header of drive status message"
      ": Invalid request type: expected=0x" << std::hex << VDQM_DRV_REQ <<
      " actual=0x" << header.reqType;
    throw ex;
  }

  // The length of the message body is checked later, just before it is read in
  // to memory

  return header;
}

//-----------------------------------------------------------------------------
// readDriveStatusMsgBody
//-----------------------------------------------------------------------------
337
338
castor::legacymsg::VdqmDrvRqstMsgBody castor::legacymsg::VdqmProxyTcpIp::
  readDriveStatusMsgBody(const int fd, const uint32_t bodyLen)  {
339
340
341
  char buf[VDQM_MSGBUFSIZ];

  if(sizeof(buf) < bodyLen) {
342
    castor::exception::Exception ex;
343
    ex.getMessage() << "Failed to read body of drive status message"
344
345
      ": Maximum body length exceeded: max=" << sizeof(buf) <<
      " actual=" << bodyLen;
346
347
348
349
    throw ex;
  }

  try {
350
    io::readBytes(fd, m_netTimeout, bodyLen, buf);
351
  } catch(castor::exception::Exception &ne) {
352
    castor::exception::Exception ex;
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
    ex.getMessage() << "Failed to read body of drive status message"
      ": " << ne.getMessage().str();
    throw ex;
  }

  legacymsg::VdqmDrvRqstMsgBody body;
  const char *bufPtr = buf;
  size_t bufLen = sizeof(buf);
  legacymsg::unmarshal(bufPtr, bufLen, body);
  return body;
}

//-----------------------------------------------------------------------------
// writeCommitAck
//-----------------------------------------------------------------------------
368
void castor::legacymsg::VdqmProxyTcpIp::writeCommitAck(const int fd)  {
369
370
371
372
373
374
375
376
377
  legacymsg::MessageHeader ack;
  ack.magic = VDQM_MAGIC;
  ack.reqType = VDQM_COMMIT;
  ack.lenOrStatus = 0;

  char buf[12]; // magic + reqType + len
  legacymsg::marshal(buf, ack);

  try {
378
    io::writeBytes(fd, m_netTimeout, sizeof(buf), buf);
379
380
381
382
383
384
385
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to write VDQM_COMMIT ack: " <<
      ne.getMessage().str();
    throw ex;
  }
}
386
387
388
389

//-----------------------------------------------------------------------------
// tapeUnmounted
//-----------------------------------------------------------------------------
390
391
392
void  castor::legacymsg::VdqmProxyTcpIp::tapeUnmounted(
  const std::string &server, const std::string &unitName,
  const std::string &dgn, const std::string &vid)  {
393
394
395
396
397
  int status = VDQM_VOL_UNMOUNT;

  try {
    legacymsg::VdqmDrvRqstMsgBody body;
    body.status = status;
398
399
400
401
    castor::utils::copyString(body.volId, vid);
    castor::utils::copyString(body.server, server);
    castor::utils::copyString(body.drive, unitName);
    castor::utils::copyString(body.dgn, dgn);
402

403
    setDriveStatus(body);
404
405
406
407
408
409
410
411
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify vdqm that tape " << vid <<
      " was unmounted from tape drive " << unitName << ": " <<
      ne.getMessage().str();
    throw ex;
  }
}