RcpJobSubmitter.cpp 10.1 KB
Newer Older
1
/******************************************************************************
2
 *                      RcpJobSubmitter.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *
 *
 *
23
 * @author Steven Murray Steven.Murray@cern.ch
24
25
 *****************************************************************************/

26
#include "castor/exception/Internal.hpp"
27
28
29
30
#include "castor/exception/InvalidArgument.hpp"
#include "castor/io/ClientSocket.hpp"
#include "castor/tape/aggregator/Constants.hpp"
#include "castor/tape/aggregator/Marshaller.hpp"
31
#include "castor/tape/aggregator/RcpJobRequest.hpp"
32
#include "castor/tape/aggregator/RcpJobReply.hpp"
33
#include "castor/tape/aggregator/RcpJobSubmitter.hpp"
34
#include "castor/tape/aggregator/SocketHelper.hpp"
35
36
37
38
39
40
41
#include "h/net.h"
#include "h/osdep.h"
#include "h/rtcp_constants.h"
#include "h/serrno.h"
#include "h/vdqm_constants.h"

#include <errno.h>
42
#include <string.h>
43
44
45
46
47
48
#include <unistd.h>


//------------------------------------------------------------------------------
// submit
//------------------------------------------------------------------------------
49
void castor::tape::aggregator::RcpJobSubmitter::submit(
50
51
52
53
54
55
56
57
58
59
60
61
  const std::string  &host,
  const unsigned int  port,
  const int           netReadWriteTimeout,
  const char         *remoteCopyType,
  const u_signed64    tapeRequestID,
  const std::string  &clientUserName,
  const std::string  &clientHost,
  const int           clientPort,
  const int           clientEuid,
  const int           clientEgid,
  const std::string  &deviceGroupName,
  const std::string  &tapeDriveName)
62
63
  throw (castor::tape::aggregator::exception::RTCPDErrorMessage,
    castor::exception::Exception) {
64

65
66
67
68
69
70
  RcpJobRequest request;

  // Check the validity of the parameters of this function
  if(clientHost.length() > sizeof(request.clientHost) - 1) {
    castor::exception::Exception ex(EINVAL);

71
72
    ex.getMessage() << __PRETTY_FUNCTION__
      << ": Length of clientHost string is too large: "
73
74
75
76
77
78
79
80
      "Maximum: " << (sizeof(request.clientHost) - 1) << " Actual: "
      << clientHost.length();

    throw ex;
  }
  if(deviceGroupName.length() > sizeof(request.deviceGroupName) - 1) {
    castor::exception::Exception ex(EINVAL);

81
82
    ex.getMessage() << __PRETTY_FUNCTION__
      << ": Length of deviceGroupName string is too large: "
83
84
85
86
87
88
89
90
      "Maximum: " << (sizeof(request.deviceGroupName) - 1) << " Actual: "
      << deviceGroupName.length();

    throw ex;
  }
  if(tapeDriveName.length() > sizeof(request.tapeDriveName) - 1) {
    castor::exception::Exception ex(EINVAL);

91
92
    ex.getMessage() << __PRETTY_FUNCTION__
      << ": Length of tapeDriveName string is too large: "
93
94
95
96
97
98
99
100
      "Maximum: " << (sizeof(request.tapeDriveName) - 1) << " Actual: "
      << tapeDriveName.length();

    throw ex;
  }
  if(clientUserName.length() > sizeof(request.clientUserName) - 1) {
    castor::exception::Exception ex(EINVAL);

101
102
    ex.getMessage() << __PRETTY_FUNCTION__
      << ": Length of clientUserName string is too large: "
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
      "Maximum: " << (sizeof(request.clientUserName) - 1) << " Actual: "
      << clientUserName.length();

    throw ex;
  }

  // Prepare the job submission request information ready for marshalling
  // The validity of the string length were check above
  request.tapeRequestID = tapeRequestID;
  request.clientPort    = clientPort;
  request.clientEuid    = clientEuid;
  request.clientEgid    = clientEgid;
  strcpy(request.clientHost     , clientHost.c_str());
  strcpy(request.deviceGroupName, deviceGroupName.c_str());
  strcpy(request.tapeDriveName  , tapeDriveName.c_str());
  strcpy(request.clientUserName , clientUserName.c_str());

  // Marshall the job submission request message
  char buf[MSGBUFSIZ];
  size_t totalLen = 0;

  try {
    totalLen = Marshaller::marshallRcpJobRequest(&buf[0], sizeof(buf), request);
  } catch(castor::exception::Exception &ex) {
    castor::exception::Internal ie;

129
130
    ie.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to marshall RCP job submission request message: "
131
132
133
134
135
136
      << ex.getMessage().str();

    throw ie;
  }

  // Connect to the RTCPD or tape aggregator daemon
137
138
139
140
141
142
143
  castor::io::ClientSocket socket(port, host);

  try {
    socket.connect();
  } catch(castor::exception::Exception &connectex) {
    castor::exception::Exception ex(ECONNABORTED);

144
145
    ex.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to connect to " << remoteCopyType
146
147
148
149
      << ": host: " << host << ": port: " << port << ": "
      << connectex.getMessage().str();
  }

150
151
  // Send the job submission request message to the RTCPD or tape aggregator
  // daemon
152
153
154
155
  try {
    SocketHelper::writeBytes(socket, netReadWriteTimeout, totalLen, buf);
  } catch(castor::exception::Exception &ex) {
    castor::exception::Exception ex2(SECOMERR);
156

157
158
159
160
161
162
    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to send the job submission request message to "
      << remoteCopyType
      << ": " << ex.getMessage().str();

    throw ex2;
163
164
  }

165
  // Read the reply from the RTCPD or tape aggregator daemon
166
  readReply(socket, netReadWriteTimeout, remoteCopyType);
167
168
169
170
171
172
}


//------------------------------------------------------------------------------
// readReply
//------------------------------------------------------------------------------
173
void castor::tape::aggregator::RcpJobSubmitter::readReply(
174
  castor::io::AbstractTCPSocket &socket, const int netReadWriteTimeout,
175
176
177
  const char *remoteCopyType)
  throw (castor::tape::aggregator::exception::RTCPDErrorMessage,
    castor::exception::Exception) {
178

179
  // Read and unmarshall the magic number
180
181
  uint32_t magic = 0;
  try {
182
    magic = SocketHelper::readUint32(socket, NETRWTIMEOUT);
183
184
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);
185

186
187
    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read magic number from " << remoteCopyType
188
      << ": " << ex.getMessage().str();
189

190
    throw ex2;
191
192
193
194
195
196
197
198
  }

  // If the magic number is invalid
  if(magic != RTCOPY_MAGIC && magic != RTCOPY_MAGIC_OLD0) {
    castor::exception::Exception ex(EBADMSG);

     ex.getMessage() << __PRETTY_FUNCTION__
       << std::hex
199
200
201
202
       << ": Invalid magic number from " << remoteCopyType
       << ": Expected: 0x" << RTCOPY_MAGIC << " or 0x"
       << RTCOPY_MAGIC_OLD0
       << ": Received: 0x" << magic;
203
204
205
206

     throw ex;
  }

207
  // Read and unmarshall the request type
208
  uint32_t reqtype = 0;
209
  try {
210
    reqtype = SocketHelper::readUint32(socket, NETRWTIMEOUT);
211
212
213
214
215
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);

    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read request type from " << remoteCopyType
216
      << ": " << ex.getMessage().str();
217
218
219

    throw ex2;
  }
220
221
222
223
224
225
226

  // If the request type is invalid
  if(reqtype != VDQM_CLIENTINFO) {
    castor::exception::Exception ex(EBADMSG);

    ex.getMessage() << __PRETTY_FUNCTION__
      << std::hex
227
228
229
      << ": Invalid request type from " << remoteCopyType
      << ": Expected: 0x" << VDQM_CLIENTINFO
      << ": Received: 0x" << reqtype;
230
231
232
233

    throw ex;
  }

234
235
236
  // Read and unmarshall the type of the request
  uint32_t len = 0;
  try {
237
    len = SocketHelper::readUint32(socket, NETRWTIMEOUT);
238
239
240
241
242
243
244
245
246
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);

    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read message body length from " << remoteCopyType
      << ": "<< ex.getMessage().str();

    throw ex2;
  }
247
248
249

  // If the message body is not large enough for a status number and an empty
  // error string
250
  if(len < sizeof(uint32_t) + 1) {
251
252
253
    castor::exception::Exception ex(EMSGSIZE);

    ex.getMessage() << __PRETTY_FUNCTION__
254
255
      << ": Invalid header from " << remoteCopyType
      << ": Message body not large enough for a status number and an empty "
256
         "string: Minimum size expected: " << (sizeof(uint32_t) + 1)
257
      << ": Received: " << len;
258
259
260
261

    throw ex;
  }

262
263
264
265
  // Only need a buffer for the message body, the header has already been read
  // from the socket and unmarshalled
  char body[MSGBUFSIZ - 3 * sizeof(uint32_t)];

266
  // If the message body is too large
267
  if(len > sizeof(body)) {
268
269
270
    castor::exception::Exception ex(EMSGSIZE);

    ex.getMessage() << __PRETTY_FUNCTION__
271
272
273
      << ": Message body from " <<  remoteCopyType << " is too large"
         ": Maximum: " << sizeof(body)
      << ": Received: " << len;
274
275
276
277

    throw ex;
  }

278
  // Read the message body
279
  try {
280
    SocketHelper::readBytes(socket, NETRWTIMEOUT, len, body);
281
282
283
284
285
286
287
288
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);

    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read message body from " << remoteCopyType
      << ": "<< ex.getMessage().str();

    throw ex2;
289
290
  }

291
292
293
294
295
296
297
298
299
300
301
302
  // Unmarshall the job submission reply
  const char  *p           = body;
  size_t      remainingLen = len;
  RcpJobReply reply;
  try {
    Marshaller::unmarshallRcpJobReply(p, remainingLen, reply);
  } catch(castor::exception::Exception &ex) {
    castor::exception::Internal ie;

    ie.getMessage() <<  __PRETTY_FUNCTION__
      << ": Failed to unmarshall job submission reply: "
      << ex.getMessage().str();
303

304
305
    throw ie;
  }
306
307

  // If RTCOPY or tape aggregator daemon returned an error message
308
309
  // Checking the size of the error message because the status maybe non-zero
  // even if there is no error
310
311
  if(strlen(reply.errorMessage) > 1) {
    castor::exception::Exception ex(reply.status);
312

313
314
    ex.getMessage() << __PRETTY_FUNCTION__
      << ": " << reply.errorMessage;
315
316
317
318

    throw ex;
  }
}