AcsMessageHandler.cpp 16.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/acs/Constants.hpp"
#include "castor/acs/AcsMessageHandler.hpp"
#include "castor/acs/AcsDismountTape.hpp"
25
#include "castor/acs/AcsForceDismountTape.hpp"
26
27
#include "castor/acs/AcsMountTapeReadOnly.hpp"
#include "castor/acs/AcsMountTapeReadWrite.hpp"
28
29
#include "castor/acs/Acs.hpp"
#include "castor/acs/AcsImpl.hpp"
Steven Murray's avatar
Steven Murray committed
30
31
32
33
34
#include "castor/messages/messages.hpp"
#include "castor/messages/ReturnValue.pb.h"
#include "castor/messages/AcsMountTapeReadOnly.pb.h"
#include "castor/messages/AcsMountTapeReadWrite.pb.h"
#include "castor/messages/AcsDismountTape.pb.h"
35
#include "castor/messages/AcsForceDismountTape.pb.h"
36
#include "serrno.h"
37
38
39
40
41
42
43
44
45
46
47

#include <sstream>

//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::acs::AcsMessageHandler::AcsMessageHandler(
  castor::tape::reactor::ZMQReactor &reactor,
  log::Logger &log,
  const std::string &hostName,
  void *const zmqContext,
48
  const AcsDaemonConfig &castorConf,
49
  AcsPendingRequests &acsPendingRequests):
50
51
52
53
  m_reactor(reactor),
  m_log(log),
  m_socket(zmqContext, ZMQ_ROUTER),
  m_hostName(hostName),
54
55
  m_castorConf(castorConf),
  m_acsPendingRequests(acsPendingRequests) { 
56
57

  std::ostringstream endpoint;
58
  endpoint << "tcp://127.0.0.1:" << m_castorConf.port;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
  
  try {
    m_socket.bind(endpoint.str().c_str());
    log::Param params[] = {log::Param("endpoint", endpoint.str())};
    m_log(LOG_INFO, "Bound the ZMQ socket of the AcsMessageHandler",
      params);
  } catch(castor::exception::Exception &ne){
    castor::exception::Exception ex;
    ex.getMessage() <<
      "Failed to bind the ZMQ socket of the AcsMessageHandler"
      ": endpoint=" << endpoint.str() << ": " << ne.getMessage().str();
    throw ex;
  }
}

//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::acs::AcsMessageHandler::~AcsMessageHandler()
  throw() {
}

//------------------------------------------------------------------------------
// getName
//------------------------------------------------------------------------------
std::string castor::acs::AcsMessageHandler::getName()
  const throw() {
  return "AcsMessageHandler";
}

//------------------------------------------------------------------------------
// fillPollFd
//------------------------------------------------------------------------------
void castor::acs::AcsMessageHandler::fillPollFd(
  zmq_pollitem_t &fd) throw() {
  fd.events = ZMQ_POLLIN;
  fd.revents = 0;
  fd.socket = m_socket.getZmqSocket();
  fd.fd = -1;
}

//------------------------------------------------------------------------------
// handleEvent
//------------------------------------------------------------------------------
bool castor::acs::AcsMessageHandler::handleEvent(
  const zmq_pollitem_t &fd) throw() {
  // Try to receive a request, simply giving up if an exception is raised
  messages::Frame rqst;

  //for handling zeroMQ's router socket type specific elements 
  //ie first frame = identity of the sender
  //   second one  =  empty
  //   third and following = actual data frames
 
  //The ZmqMsg address data can be dump as string and used as key for storing 
  //the identity (for clients who need a late answer)
  castor::messages::ZmqMsg address;
  castor::messages::ZmqMsg empty;
  try {
    checkSocket(fd);
    m_socket.recv(address);
    m_socket.recv(empty);
    rqst = messages::recvFrame(m_socket);
  } catch(castor::exception::Exception &ex) {
    log::Param params[] = {log::Param("message", ex.getMessage().str())};
    m_log(LOG_ERR, "AcsMessageHandler failed to handle event", params);
    return false; // Give up and stay registered with the reactor
  }
  log::Param params[] = {
      log::Param("sender identity", 
Steven Murray's avatar
Steven Murray committed
129
              utils::hexDump(address.getData(),address.size()))
130
131
     };
  m_log(LOG_DEBUG, "handling event in AcsMessageHandler", params);
132
 
133
  // From this point on any exception thrown should be converted into an
134
  // Exception message and sent back to the client 
135
  messages::Frame reply;
136
137
  bool exceptionOccurred = false;
  
138
  try {
139
140
141
    //m_acsPendingRequests.checkAndAddRequest(address, empty, rqst, m_socket);
    // if there are any problems we need to send the replay to the client.
    
142
143
144
    reply = dispatchMsgHandler(rqst);
  } catch(castor::exception::Exception &ex) {
    reply = createExceptionFrame(ex.code(), ex.getMessage().str());
145
146
    exceptionOccurred=true;
    m_log(LOG_ERR, ex.getMessage().str());
147
148
  } catch(std::exception &se) {
    reply = createExceptionFrame(SEINTERNAL, se.what());
149
    exceptionOccurred=true;
150
151
152
    m_log(LOG_ERR, se.what());
  } catch(...) {
    reply = createExceptionFrame(SEINTERNAL, "Caught an unknown exception");  
153
    exceptionOccurred=true;
154
155
156
    m_log(LOG_ERR, "Caught an unknown exception");
  }

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
  if (exceptionOccurred) {
    // Send the reply to the client if we were not able to add request to the
    // list
    try {
      //we need to prepend our frames the same way we received them
      // ie identity + empty frames 
      m_socket.send(address,ZMQ_SNDMORE);
      m_socket.send(empty,ZMQ_SNDMORE);
      messages::sendFrame(m_socket, reply);    
    } catch(castor::exception::Exception &ex) {
      log::Param params[] = {log::Param("message", ex.getMessage().str())};
      m_log(LOG_ERR, "AcsMessageHandler failed to send reply to client", params);
    }
  }
*/

174
175
176
177
178
179
  // Send the reply to the client
  try {
    //we need to prepend our frames the same way we received them
    // ie identity + empty frames 
    m_socket.send(address,ZMQ_SNDMORE);
    m_socket.send(empty,ZMQ_SNDMORE);
180

181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
    messages::sendFrame(m_socket, reply);
  } catch(castor::exception::Exception &ex) {
    log::Param params[] = {log::Param("message", ex.getMessage().str())};
    m_log(LOG_ERR, "AcsMessageHandler failed to send reply to client", params);
  }

  return false; // Stay registered with the reactor
}

//------------------------------------------------------------------------------
// checkSocket
//------------------------------------------------------------------------------
void castor::acs::AcsMessageHandler::checkSocket(
  const zmq_pollitem_t &fd) const{
  void* underlyingSocket = m_socket.getZmqSocket();
  if(fd.socket != underlyingSocket){
    castor::exception::Exception ex;
    ex.getMessage() << "AcsMessageHandler passed wrong poll item";
    throw ex;
  }
}

//------------------------------------------------------------------------------
// dispatchMsgHandler
//------------------------------------------------------------------------------
castor::messages::Frame castor::acs::AcsMessageHandler::
  dispatchMsgHandler(const messages::Frame &rqst) {
  m_log(LOG_DEBUG, "AcsMessageHandler dispatching message handler");
  
210
211
212
213
  const messages::MsgType msgType = (messages::MsgType)rqst.header.msgtype();
  switch(msgType) {
  case messages::MSG_TYPE_ACSMOUNTTAPEREADONLY:
    return handleAcsMountTapeReadOnly(rqst);
214
      
215
216
  case messages::MSG_TYPE_ACSMOUNTTAPEREADWRITE:
    return handleAcsMountTapeReadWrite(rqst);  
217

218
219
  case messages::MSG_TYPE_ACSDISMOUNTTAPE:
    return handleAcsDismountTape(rqst);
220

221
222
223
224
  case messages::MSG_TYPE_ACSFORCEDISMOUNTTAPE:
    return handleAcsForceDismountTape(rqst);

  default:
225
    {
226
      const std::string msgTypeStr = messages::msgTypeToString(msgType);
227
228
      castor::exception::Exception ex;
      ex.getMessage() << "Failed to dispatch message handler"
229
230
        ": Unexpected request type: msgType=" << msgType << " msgTypeStr=" <<
        msgTypeStr;
231
232
233
234
235
236
      throw ex;
    }
  }
}

//------------------------------------------------------------------------------
237
// handleAcsMountTapeReadOnly
238
239
//------------------------------------------------------------------------------
castor::messages::Frame castor::acs::AcsMessageHandler::
240
241
  handleAcsMountTapeReadOnly(const messages::Frame &rqst) {
  m_log(LOG_DEBUG, "Handling AcsMountTapeReadOnly message");
242
243

  try {
244
    messages::AcsMountTapeReadOnly rqstBody;
245
246
247
248
249
250
251
252
    rqst.parseBodyIntoProtocolBuffer(rqstBody);
    
    const std::string vid = rqstBody.vid();
    const uint32_t acs    = rqstBody.acs();
    const uint32_t lsm    = rqstBody.lsm();
    const uint32_t panel  = rqstBody.panel();
    const uint32_t drive  = rqstBody.drive();
    
253
    log::Param params[] = {log::Param("TPVID", vid),
254
255
256
257
      log::Param("acs", acs),
      log::Param("lsm", lsm),
      log::Param("panel", panel),
      log::Param("drive", drive)};
258
    m_log(LOG_INFO, "Mount tape for read only access", params);
259

260
    castor::acs::AcsImpl acsWrapper;
261
    castor::acs::AcsMountTapeReadOnly acsMountTapeReadOnly(vid, acs, lsm, 
262
263
      panel, drive, acsWrapper, m_log, m_castorConf);
    try {
264
265
      acsMountTapeReadOnly.execute();
      m_log(LOG_INFO,"Tape successfully mounted for read only access", params);
266
    } catch (castor::exception::Exception &ne) {
267
268
      m_log(LOG_ERR,"Tape mount for read only access failed: "
        + ne.getMessage().str(), params);  
269
270
271
272
273
274
      throw;  
    }     
    const messages::Frame reply = createReturnValueFrame(0);
    return reply;
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
275
    ex.getMessage() << "Failed to handle AcsMountTapeReadOnly message: " <<
276
277
278
279
280
281
      ne.getMessage().str();
    throw ex;
  }
}

//------------------------------------------------------------------------------
282
// handleAcsMountTapeReadWrite
283
284
//------------------------------------------------------------------------------
castor::messages::Frame castor::acs::AcsMessageHandler::
285
286
  handleAcsMountTapeReadWrite(const messages::Frame &rqst) {
  m_log(LOG_DEBUG, "Handling AcsMountTapeReadWrite message");
287
288

  try {
289
    messages::AcsMountTapeReadWrite rqstBody;
290
291
292
293
294
295
296
297
    rqst.parseBodyIntoProtocolBuffer(rqstBody);
     
    const std::string vid = rqstBody.vid();
    const uint32_t acs    = rqstBody.acs();
    const uint32_t lsm    = rqstBody.lsm();
    const uint32_t panel  = rqstBody.panel();
    const uint32_t drive  = rqstBody.drive();
    
298
    log::Param params[] = {log::Param("TPVID", vid),
299
300
301
302
      log::Param("acs", acs),
      log::Param("lsm", lsm),
      log::Param("panel", panel),
      log::Param("drive", drive)};
303
    m_log(LOG_INFO, "Mount tape for read/write access", params);
304

305
    castor::acs::AcsImpl acsWrapper;
306
    castor::acs::AcsMountTapeReadWrite acsMountTapeReadWrite(vid, acs,
307
308
      lsm, panel, drive, acsWrapper, m_log, m_castorConf);
    try {
309
310
      acsMountTapeReadWrite.execute();   
      m_log(LOG_INFO,"Tape successfully mounted for read/write access", params);
311
    } catch (castor::exception::Exception &ne) {
312
313
      m_log(LOG_ERR,"Tape mount for read/write access failed: "
        + ne.getMessage().str(), params);  
314
315
316
317
318
319
      throw;  
    }     
    const messages::Frame reply = createReturnValueFrame(0);
    return reply;
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
320
    ex.getMessage() << "Failed to handle AcsMountTapeReadWrite message: " <<
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
      ne.getMessage().str();
    throw ex;
  }
}

//------------------------------------------------------------------------------
// handleAcsDismountTape
//------------------------------------------------------------------------------
castor::messages::Frame castor::acs::AcsMessageHandler::
  handleAcsDismountTape(const messages::Frame& rqst) {
  m_log(LOG_DEBUG, "Handling AcsDismountTape message");

  try {
    messages::AcsDismountTape rqstBody;
    rqst.parseBodyIntoProtocolBuffer(rqstBody);

    const std::string vid = rqstBody.vid();
    const uint32_t acs    = rqstBody.acs();
    const uint32_t lsm    = rqstBody.lsm();
    const uint32_t panel  = rqstBody.panel();
    const uint32_t drive  = rqstBody.drive();
    
343
    log::Param params[] = {log::Param("TPVID", vid),
344
345
346
347
      log::Param("acs", acs),
      log::Param("lsm", lsm),
      log::Param("panel", panel),
      log::Param("drive", drive)};
348
    m_log(LOG_INFO, "Dismount tape", params);
349

350
    castor::acs::AcsImpl acsWrapper;
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
    castor::acs::AcsDismountTape acsDismountTape(vid, acs, lsm, panel, drive,
      acsWrapper, m_log, m_castorConf);
    try {
      acsDismountTape.execute();
      m_log(LOG_INFO,"Tape successfully dismounted", params);
    } catch (castor::exception::Exception &ne) {
      m_log(LOG_ERR,"Tape dismount failed: "+ne.getMessage().str(), params);  
      throw;  
    }    
    const messages::Frame reply = createReturnValueFrame(0);
    return reply;
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle AcsDismountTape message: " <<
      ne.getMessage().str();
    throw ex;
  } catch(...) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle AcsDismountTape message: " 
                    << "Caught an unknown exception";
    throw ex;
  }
}

375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
//------------------------------------------------------------------------------
// handleAcsForceDismountTape
//------------------------------------------------------------------------------
castor::messages::Frame castor::acs::AcsMessageHandler::
  handleAcsForceDismountTape(const messages::Frame& rqst) {
  m_log(LOG_DEBUG, "Handling AcsDismountTape message");

  try {
    messages::AcsForceDismountTape rqstBody;
    rqst.parseBodyIntoProtocolBuffer(rqstBody);

    const std::string vid = rqstBody.vid();
    const uint32_t acs    = rqstBody.acs();
    const uint32_t lsm    = rqstBody.lsm();
    const uint32_t panel  = rqstBody.panel();
    const uint32_t drive  = rqstBody.drive();

    log::Param params[] = {log::Param("TPVID", vid),
      log::Param("acs", acs),
      log::Param("lsm", lsm),
      log::Param("panel", panel),
      log::Param("drive", drive)};
    m_log(LOG_INFO, "Force dismount tape", params);

    castor::acs::AcsImpl acsWrapper;
    castor::acs::AcsForceDismountTape acsForceDismountTape(vid, acs, lsm,
      panel, drive, acsWrapper, m_log, m_castorConf);
    try {
      acsForceDismountTape.execute();
      m_log(LOG_INFO,"Tape successfully force dismounted", params);
    } catch (castor::exception::Exception &ne) {
      m_log(LOG_ERR,"Tape force dismount failed: "+ne.getMessage().str(),
        params);
      throw;
    }
    const messages::Frame reply = createReturnValueFrame(0);
    return reply;
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle AcsForceDismountTape message: " <<
      ne.getMessage().str();
    throw ex;
  } catch(...) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle AcsForceDismountTape message: "
                    << "Caught an unknown exception";
    throw ex;
  }
}

425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
//------------------------------------------------------------------------------
// createReturnValueFrame
//------------------------------------------------------------------------------
castor::messages::Frame castor::acs::AcsMessageHandler::
  createReturnValueFrame(const int value) {
  messages::Frame frame;

  frame.header = castor::messages::protoTapePreFillHeader();
  frame.header.set_msgtype(messages::MSG_TYPE_RETURNVALUE);
  frame.header.set_bodyhashvalue(messages::computeSHA1Base64(frame.body));
  frame.header.set_bodysignature("PIPO");

  messages::ReturnValue body;
  body.set_value(value);
  frame.serializeProtocolBufferIntoBody(body);

  return frame;
}

//------------------------------------------------------------------------------
// createExceptionFrame
//------------------------------------------------------------------------------
castor::messages::Frame castor::acs::AcsMessageHandler::
  createExceptionFrame(const int code, const std::string& msg) {
  messages::Frame frame;

  frame.header = castor::messages::protoTapePreFillHeader();
  frame.header.set_msgtype(messages::MSG_TYPE_EXCEPTION);
  frame.header.set_bodyhashvalue(messages::computeSHA1Base64(frame.body));
  frame.header.set_bodysignature("PIPO");

  messages::Exception body;
  body.set_code(code);
  body.set_message(msg);
  frame.serializeProtocolBufferIntoBody(body);

  return frame;
}