RcpJobSubmitter.cpp 10 KB
Newer Older
1
/******************************************************************************
2
 *                      RcpJobSubmitter.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *
 *
 *
23
 * @author Steven Murray Steven.Murray@cern.ch
24
25
 *****************************************************************************/

26
#include "castor/exception/Internal.hpp"
27
28
29
30
#include "castor/exception/InvalidArgument.hpp"
#include "castor/io/ClientSocket.hpp"
#include "castor/tape/aggregator/Constants.hpp"
#include "castor/tape/aggregator/Marshaller.hpp"
31
32
#include "castor/tape/aggregator/RcpJobRequest.hpp"
#include "castor/tape/aggregator/RcpJobSubmitter.hpp"
33
#include "castor/tape/aggregator/SocketHelper.hpp"
34
35
36
37
38
39
40
#include "h/net.h"
#include "h/osdep.h"
#include "h/rtcp_constants.h"
#include "h/serrno.h"
#include "h/vdqm_constants.h"

#include <errno.h>
41
#include <string.h>
42
43
44
45
46
47
#include <unistd.h>


//------------------------------------------------------------------------------
// submit
//------------------------------------------------------------------------------
48
void castor::tape::aggregator::RcpJobSubmitter::submit(
49
50
51
52
53
54
55
56
57
58
59
60
  const std::string  &host,
  const unsigned int  port,
  const int           netReadWriteTimeout,
  const char         *remoteCopyType,
  const u_signed64    tapeRequestID,
  const std::string  &clientUserName,
  const std::string  &clientHost,
  const int           clientPort,
  const int           clientEuid,
  const int           clientEgid,
  const std::string  &deviceGroupName,
  const std::string  &tapeDriveName)
61
62
  throw (castor::tape::aggregator::exception::RTCPDErrorMessage,
    castor::exception::Exception) {
63

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
  RcpJobRequest request;

  // Check the validity of the parameters of this function
  if(clientHost.length() > sizeof(request.clientHost) - 1) {
    castor::exception::Exception ex(EINVAL);

    ex.getMessage() << "Length of clientHost string is too large: "
      "Maximum: " << (sizeof(request.clientHost) - 1) << " Actual: "
      << clientHost.length();

    throw ex;
  }
  if(deviceGroupName.length() > sizeof(request.deviceGroupName) - 1) {
    castor::exception::Exception ex(EINVAL);

    ex.getMessage() << "Length of deviceGroupName string is too large: "
      "Maximum: " << (sizeof(request.deviceGroupName) - 1) << " Actual: "
      << deviceGroupName.length();

    throw ex;
  }
  if(tapeDriveName.length() > sizeof(request.tapeDriveName) - 1) {
    castor::exception::Exception ex(EINVAL);

    ex.getMessage() << "Length of tapeDriveName string is too large: "
      "Maximum: " << (sizeof(request.tapeDriveName) - 1) << " Actual: "
      << tapeDriveName.length();

    throw ex;
  }
  if(clientUserName.length() > sizeof(request.clientUserName) - 1) {
    castor::exception::Exception ex(EINVAL);

    ex.getMessage() << "Length of clientUserName string is too large: "
      "Maximum: " << (sizeof(request.clientUserName) - 1) << " Actual: "
      << clientUserName.length();

    throw ex;
  }

  // Prepare the job submission request information ready for marshalling
  // The validity of the string length were check above
  request.tapeRequestID = tapeRequestID;
  request.clientPort    = clientPort;
  request.clientEuid    = clientEuid;
  request.clientEgid    = clientEgid;
  strcpy(request.clientHost     , clientHost.c_str());
  strcpy(request.deviceGroupName, deviceGroupName.c_str());
  strcpy(request.tapeDriveName  , tapeDriveName.c_str());
  strcpy(request.clientUserName , clientUserName.c_str());

  // Marshall the job submission request message
  char buf[MSGBUFSIZ];
  size_t totalLen = 0;

  try {
    totalLen = Marshaller::marshallRcpJobRequest(&buf[0], sizeof(buf), request);
  } catch(castor::exception::Exception &ex) {
    castor::exception::Internal ie;

    ie.getMessage()
      << "Failed to marshall RCP job submission request message: "
      << ex.getMessage().str();

    throw ie;
  }

  // Connect to the RTCPD or tape aggregator daemon
132
133
134
135
136
137
138
139
140
141
142
143
  castor::io::ClientSocket socket(port, host);

  try {
    socket.connect();
  } catch(castor::exception::Exception &connectex) {
    castor::exception::Exception ex(ECONNABORTED);

    ex.getMessage() << "Failed to connect to " << remoteCopyType
      << ": host: " << host << ": port: " << port << ": "
      << connectex.getMessage().str();
  }

144
145
146
  // Send the job submission request message to the RTCPD or tape aggregator
  // daemon
  const int rc = netwrite_timeout(socket.socket(), buf, totalLen,
147
    netReadWriteTimeout);
148

149
150
  if(rc == -1) {
    castor::exception::Exception ex(SECOMERR);
151
152
153
    ex.getMessage() << __PRETTY_FUNCTION__
      << ": netwrite(REQ) to " << remoteCopyType << ": " << neterror();
    throw ex;
154
155
  } else if(rc == 0) {
    castor::exception::Exception ex(SECONNDROP);
156
    ex.getMessage() << __PRETTY_FUNCTION__
157
      << ": netwrite(REQ) to " << remoteCopyType << ": Connection dropped";
158
159
160
    throw ex;
  }

161
  // Read the reply from the RTCPD or tape aggregator daemon
162
  readReply(socket, netReadWriteTimeout, remoteCopyType);
163
164
165
166
167
168
}


//------------------------------------------------------------------------------
// readReply
//------------------------------------------------------------------------------
169
void castor::tape::aggregator::RcpJobSubmitter::readReply(
170
  castor::io::AbstractTCPSocket &socket, const int netReadWriteTimeout,
171
172
173
  const char *remoteCopyType)
  throw (castor::tape::aggregator::exception::RTCPDErrorMessage,
    castor::exception::Exception) {
174

175
176
177
  // Read and unmarshall the magic number of the request
  uint32_t magic = 0;
  try {
178
    magic = SocketHelper::readUint32(socket, NETRWTIMEOUT);
179
180
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);
181

182
183
184
    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read magic number from " << remoteCopyType
      << ": "<< ex.getMessage().str();
185

186
    throw ex2;
187
188
189
190
191
192
193
194
  }

  // If the magic number is invalid
  if(magic != RTCOPY_MAGIC && magic != RTCOPY_MAGIC_OLD0) {
    castor::exception::Exception ex(EBADMSG);

     ex.getMessage() << __PRETTY_FUNCTION__
       << std::hex
195
196
197
       << ": Invalid header from " << remoteCopyType
       << ": Invalid magic number: Expected: 0x" << RTCOPY_MAGIC << " or 0x"
       << RTCOPY_MAGIC_OLD0 << ": Received: 0x" << magic;
198
199
200
201

     throw ex;
  }

202
  // Read and unmarshall the type of the request
203
  uint32_t reqtype = 0;
204
  try {
205
    reqtype = SocketHelper::readUint32(socket, NETRWTIMEOUT);
206
207
208
209
210
211
212
213
214
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);

    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read request type from " << remoteCopyType
      << ": "<< ex.getMessage().str();

    throw ex2;
  }
215
216
217
218
219
220
221

  // If the request type is invalid
  if(reqtype != VDQM_CLIENTINFO) {
    castor::exception::Exception ex(EBADMSG);

    ex.getMessage() << __PRETTY_FUNCTION__
      << std::hex
222
223
224
      << ": Invalid header from " << remoteCopyType
      << ": Invalid request type: Expected: 0x" << VDQM_CLIENTINFO
      << ": Received 0x" << reqtype;
225
226
227
228

    throw ex;
  }

229
230
231
  // Read and unmarshall the type of the request
  uint32_t len = 0;
  try {
232
    len = SocketHelper::readUint32(socket, NETRWTIMEOUT);
233
234
235
236
237
238
239
240
241
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);

    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read message body length from " << remoteCopyType
      << ": "<< ex.getMessage().str();

    throw ex2;
  }
242
243
244

  // If the message body is not large enough for a status number and an empty
  // error string
245
  if(len < sizeof(uint32_t) + 1) {
246
247
248
    castor::exception::Exception ex(EMSGSIZE);

    ex.getMessage() << __PRETTY_FUNCTION__
249
250
251
      << ": Invalid header from " << remoteCopyType
      << ": Message body not large enough for a status number and an empty "
      "string: Minimum size expected: " << (sizeof(uint32_t) + 1)
252
      << ": Received: " << len;
253
254
255
256

    throw ex;
  }

257
258
259
260
  // Only need a buffer for the message body, the header has already been read
  // from the socket and unmarshalled
  char body[MSGBUFSIZ - 3 * sizeof(uint32_t)];

261
  // If the message body is too large
262
  if(len > sizeof(body)) {
263
264
265
    castor::exception::Exception ex(EMSGSIZE);

    ex.getMessage() << __PRETTY_FUNCTION__
266
267
      << ": Invalid header from " << remoteCopyType
      << ": Message body too large: Maximum length: "
268
      << sizeof(body) << " Actual length: " << len;
269
270
271
272

    throw ex;
  }

273
274
  // Read the message body from the socket
  try {
275
    SocketHelper::readBytes(socket, NETRWTIMEOUT, len, body);
276
277
278
279
280
281
282
283
  } catch (castor::exception::Exception &ex) {
    castor::exception::Exception ex2(EIO);

    ex2.getMessage() << __PRETTY_FUNCTION__
      << ": Failed to read message body from " << remoteCopyType
      << ": "<< ex.getMessage().str();

    throw ex2;
284
285
286
  }

  // Unmarshall the status number
287
288
289
290
  char     *p           = body;
  size_t   remainingLen = len;
  uint32_t status       = 0;
  Marshaller::unmarshallUint32(p, remainingLen, status);
291
292
293

  // Unmarshall the error message
  char errMsg[1024];
294
295
  const size_t errMsgSize = remainingLen < sizeof(errMsg) ?  remainingLen :
    sizeof(errMsg);
296
297
298
299
300
  errMsg[0] = '\0';
  strncpy(errMsg, p, errMsgSize);
  errMsg[errMsgSize - 1] = '\0';

  // If RTCOPY or tape aggregator daemon returned an error message
301
302
303
  // Checking the size of the error message because the status maybe non-zero
  // even if there is no error
  if(errMsgSize > 1) {
304
    castor::exception::Exception ex(status);
305

306
    ex.getMessage() << errMsg;
307
308
309
310

    throw ex;
  }
}