TapeMessageHandler.cpp 12.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
19
 * @author Castor Dev team, castor-dev@cern.ch
20
21
22
 *****************************************************************************/


Steven Murray's avatar
Steven Murray committed
23
#include "castor/messages/Constants.hpp"
24
#include "castor/messages/Header.pb.h"
Steven Murray's avatar
Steven Murray committed
25
26
#include "castor/messages/messages.hpp"
#include "castor/messages/NotifyDrive.pb.h"
27
28
#include "castor/tape/tapeserver/daemon/Constants.hpp"
#include "castor/tape/tapeserver/daemon/TapeMessageHandler.hpp"
29
30
31
#include "castor/tape/utils/utils.hpp"
#include "castor/utils/utils.hpp"
#include "h/Ctape.h"
32
#include "h/vmgr_constants.h"
33

34
35
36
37
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeMessageHandler::TapeMessageHandler(
38
  reactor::ZMQReactor &reactor,
39
40
41
  log::Logger &log,DriveCatalogue &driveCatalogue,
  const std::string &hostName,
  castor::legacymsg::VdqmProxy & vdqm,
42
43
  castor::legacymsg::VmgrProxy & vmgr,
  void *const zmqContext):
44
  m_reactor(reactor),
45
46
  m_log(log),
  m_socket(zmqContext, ZMQ_REP),
47
48
49
  m_driveCatalogue(driveCatalogue),
  m_hostName(hostName),
  m_vdqm(vdqm),
50
51
52
53
  m_vmgr(vmgr) { 

  std::ostringstream endpoint;
  endpoint << "tcp://127.0.0.1:" << TAPE_SERVER_INTERNAL_LISTENING_PORT;
54
  
55
56
57
58
59
60
61
62
63
64
65
  try {
    m_socket.bind(endpoint.str().c_str());
    log::Param params[] = {log::Param("endpoint", endpoint.str())};
    m_log(LOG_INFO, "Bound the ZMQ_REP socket of the TapeMessageHandler",
      params);
  } catch(castor::exception::Exception &ne){
    castor::exception::Exception ex;
    ex.getMessage() <<
      "Failed to bind the ZMQ_REP socket of the TapeMessageHandler"
      ": endpoint=" << endpoint.str() << ": " << ne.getMessage().str();
    throw ex;
66
67
68
  }
}

Steven Murray's avatar
Steven Murray committed
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeMessageHandler::~TapeMessageHandler()
  throw() {
}

//------------------------------------------------------------------------------
// getName
//------------------------------------------------------------------------------
std::string castor::tape::tapeserver::daemon::TapeMessageHandler::getName() 
  const throw() {
  return "TapeMessageHandler";
}

84
85
86
87
//------------------------------------------------------------------------------
// fillPollFd
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeMessageHandler::fillPollFd(
Steven Murray's avatar
Steven Murray committed
88
89
  zmq_pollitem_t &fd) throw() {
  fd.events = ZMQ_POLLIN;
90
  fd.revents = 0;
91
  fd.socket = m_socket.getZmqSocket();
92
93
94
95
96
97
98
  fd.fd= -1;
}

//------------------------------------------------------------------------------
// handleEvent
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::TapeMessageHandler::handleEvent(
Steven Murray's avatar
Steven Murray committed
99
  const zmq_pollitem_t &fd) {
100
101
102
103
104
  checkSocket(fd);
  m_log(LOG_INFO,"handling event in TapeMessageHandler");
  messages::Header header; 
  
  try{
105
106
    tape::utils::ZmqMsg headerBlob;
    m_socket.recv(&headerBlob.getZmqMsg());
107
    
108
    if(!zmq_msg_more(&headerBlob.getZmqMsg())){
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
      castor::exception::Exception ex;
      ex.getMessage() <<"Read header blob, expecting a multi parts message but nothing to come";
      throw ex;
    }
    
    header = buildHeader(headerBlob);
  }
  catch(const castor::exception::Exception& ex){
    m_log(LOG_ERR,"Error while dealing the message's header");
    return false;
  }
  
  dispatchEvent(header);
  return false; // Ask reactor to remove and delete this handler
}

//------------------------------------------------------------------------------
// checkSocket
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeMessageHandler::checkSocket(
Steven Murray's avatar
Steven Murray committed
129
  const zmq_pollitem_t &fd) {
130
  void* underlyingSocket = m_socket.getZmqSocket();
131
132
133
134
135
136
137
138
  if(fd.socket != underlyingSocket){
    castor::exception::Exception ex;
    ex.getMessage() <<"TapeMessageHandler passed wrong poll item";
    throw ex;
  }
}


139
140
141
//------------------------------------------------------------------------------
// buildHeader
//------------------------------------------------------------------------------
142
castor::messages::Header castor::tape::tapeserver::daemon::TapeMessageHandler::buildHeader(
143
  tape::utils::ZmqMsg &headerBlob){  
144
  messages::Header header; 
145
146
  const bool headerCorrectlyParsed = header.ParseFromArray(
    zmq_msg_data(&headerBlob.getZmqMsg()), zmq_msg_size(&headerBlob.getZmqMsg()));
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
  
  if(!headerCorrectlyParsed){
    castor::exception::Exception ex;
    ex.getMessage() <<"TapeMessageHandler received a bad header while handling event";
    throw ex;
  }
    if(header.magic() != TPMAGIC){
    castor::exception::Exception ex;
    ex.getMessage() <<"Wrong magic number in the header";
    throw ex;
  }
  if(header.protocoltype() != messages::protocolType::Tape){
    castor::exception::Exception ex;
    ex.getMessage() <<"Wrong protocol specified in the header";
    throw ex;
  }
  return header;
}

166
167
168
//------------------------------------------------------------------------------
// dispatchEvent
//------------------------------------------------------------------------------
169
void castor::tape::tapeserver::daemon::TapeMessageHandler::dispatchEvent(
170
  messages::Header& header) {
171
  m_log(LOG_INFO,"dispatching  event in TapeMessageHandler");
172
173
  tape::utils::ZmqMsg bodyBlob;
  m_socket.recv(&bodyBlob.getZmqMsg());
174
175
176
177
178
  
  switch(header.reqtype()){
    case messages::reqType::Heartbeat:
    {
      castor::messages::Heartbeat body;
179
      unserialize(body,bodyBlob);
180
      castor::messages::checkSHA1(header,bodyBlob);
181
182
183
      dealWith(header,body);
    }
      break;
184
185
186
187
    case messages::reqType::NotifyDriveBeforeMountStarted:
    {
      castor::messages::NotifyDriveBeforeMountStarted body;
      unserialize(body,bodyBlob);
188
      castor::messages::checkSHA1(header,bodyBlob);
189
190
191
192
193
194
195
      dealWith(header,body);
    }
      break;
    case messages::reqType::NotifyDriveTapeMounted:
    {
      castor::messages::NotifyDriveTapeMounted body;
      unserialize(body,bodyBlob);
196
      castor::messages::checkSHA1(header,bodyBlob);
197
198
199
200
      dealWith(header,body);
    }
      break;
    case messages::reqType::NotifyDriveTapeUnmounted:
201
      sendSuccessReplyToClient();
202
203
      break;
    case messages::reqType::NotifyDriveUnmountStarted:
204
      sendSuccessReplyToClient();
205
      break;
206
    default:
207
      m_log(LOG_ERR,"default  dispatch in TapeMessageHandler");
208
209
210
211
      break;
  }
}

212
213
214
//------------------------------------------------------------------------------
// dealWith
//------------------------------------------------------------------------------
215
216
217
218
219
220
221
void castor::tape::tapeserver::daemon::TapeMessageHandler::dealWith(
const castor::messages::Header& header, const castor::messages::Heartbeat& body){
  
  std::vector<log::Param> param;
  param.push_back(log::Param("bytesMoved",body.bytesmoved()));
  m_log(LOG_INFO,"IT IS ALIVE",param);
  
222
  sendSuccessReplyToClient();
223
224
}

225
226
227
228
229
void castor::tape::tapeserver::daemon::TapeMessageHandler::dealWith(
const castor::messages::Header& header, 
const castor::messages::NotifyDriveBeforeMountStarted& body){
  m_log(LOG_INFO,"NotifyDriveBeforeMountStarted-dealWith");
  //check castor consistensy
230
  if(body.mode()==castor::messages::TAPE_MODE_READWRITE) {
231
232
233
    legacymsg::VmgrTapeInfoMsgBody tapeInfo;
    m_vmgr.queryTape(body.vid(), tapeInfo);
    // If the client is the tape gateway and the volume is not marked as BUSY
234
235
    if(body.clienttype() == castor::messages::NotifyDriveBeforeMountStarted::CLIENT_TYPE_GATEWAY 
            && !(tapeInfo.status & TAPE_BUSY)) {
236
237
      castor::exception::Exception ex;
      ex.getMessage() << "The tape gateway is the client and the tape to be mounted is not BUSY: vid=" << body.vid();
238
239
240
      
      //send an error to the client
      sendErrorReplyToClient(ex);
241
242
      throw ex;
    }
243
    
244
245
246
    castor::messages::NotifyDriveBeforeMountStartedAnswer body;
    body.set_howmanyfilesontape(tapeInfo.nbFiles);
    
247
248
    castor::messages::Header header = castor::messages::preFillHeader();
    header.set_reqtype(messages::reqType::NotifyDriveBeforeMountStartedAnswer);
249
    header.set_bodyhashvalue(castor::messages::computeSHA1Base64(body));
250
251
    header.set_bodysignature("PIPO");
    
252
    //send the number of files on the tape to the client
253
254
255
    castor::messages::sendMessage(m_socket,header,ZMQ_SNDMORE);
    castor::messages::sendMessage(m_socket,body);
  } else {
256
257
    //we were not event in castor::messages::TAPE_MODE_READWRITE mpde
    //so send empty answer
258
    sendSuccessReplyToClient();
259
  }
260
}
261
262
263
//------------------------------------------------------------------------------
// dealWith
//------------------------------------------------------------------------------
264
265
266
267
void castor::tape::tapeserver::daemon::TapeMessageHandler::dealWith(
const castor::messages::Header& header, 
const castor::messages::NotifyDriveTapeMounted& body){
  m_log(LOG_INFO,"NotifyDriveTapeMounted-dealWith");
Daniele Kruse's avatar
Daniele Kruse committed
268
269
270
  DriveCatalogueEntry *const drive = m_driveCatalogue.findDrive(body.unitname());
  drive->updateVolumeInfo(body);
  const utils::DriveConfig &driveConfig = drive->getConfig();
271
272
    
  const std::string vid = body.vid();
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
  try
  {
    switch(body.mode()) {
      case castor::messages::TAPE_MODE_READ:
        m_vmgr.tapeMountedForRead(vid);
        break;
      case castor::messages::TAPE_MODE_READWRITE:
        m_vmgr.tapeMountedForWrite(vid);
        break;
      case castor::messages::TAPE_MODE_DUMP:
        m_vmgr.tapeMountedForRead(vid);
        break;
      case castor::messages::TAPE_MODE_NONE:
        break;
      default:
        castor::exception::Exception ex;
        ex.getMessage() << "Unknown tape mode: " << body.mode();
        throw ex;
    }
    m_vdqm.tapeMounted(m_hostName, body.unitname(), driveConfig.dgn, body.vid(),
            drive->getSessionPid());
  }catch(const castor::exception::Exception& ex){
    sendErrorReplyToClient(ex);
    throw;
297
298
  }
  
299
  sendSuccessReplyToClient();
300
}
301
302
303
//------------------------------------------------------------------------------
// sendSuccessReplyToClient
//------------------------------------------------------------------------------
304
void castor::tape::tapeserver::daemon::TapeMessageHandler::sendSuccessReplyToClient(){
305
    castor::messages::ReturnValue body;
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    sendReplyToClient(0,"");
}
//------------------------------------------------------------------------------
// sendErrorReplyToClient
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeMessageHandler::
sendErrorReplyToClient(const castor::exception::Exception& ex){
  //any positive value will trigger an exception in the client side
  sendReplyToClient(ex.code(),ex.getMessageValue());
}
//------------------------------------------------------------------------------
// sendReplyToClient
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeMessageHandler::
sendReplyToClient(int returnValue,const std::string& msg){
    castor::messages::ReturnValue body;
    body.set_returnvalue(returnValue);
    body.set_message(msg);
324
325
326
327
328
329
  
    castor::messages::Header header = castor::messages::preFillHeader();
    header.set_reqtype(messages::reqType::ReturnValue);
    header.set_bodyhashvalue(castor::messages::computeSHA1Base64(body));
    header.set_bodysignature("PIPO");

330
331
    castor::messages::sendMessage(m_socket,header,ZMQ_SNDMORE);
    castor::messages::sendMessage(m_socket,body);
332
}