TapeDaemon.cpp 45.2 KB
Newer Older
Steven Murray's avatar
Steven Murray committed
1
/******************************************************************************
2
 *         castor/tape/tapeserver/daemon/TapeDaemon.cpp
Steven Murray's avatar
Steven Murray committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *
 *
 * @author Steven.Murray@cern.ch
 *****************************************************************************/
 
25
#include "castor/common/CastorConfiguration.hpp"
Steven Murray's avatar
Steven Murray committed
26
#include "castor/exception/Errnum.hpp"
27
#include "castor/exception/BadAlloc.hpp"
28
#include "castor/io/io.hpp"
Daniele Kruse's avatar
Daniele Kruse committed
29
30
#include "castor/legacymsg/CommonMarshal.hpp"
#include "castor/legacymsg/TapeMarshal.hpp"
31
#include "castor/tape/tapebridge/Constants.hpp"
32
#include "castor/tape/tapeserver/daemon/AdminAcceptHandler.hpp"
33
#include "castor/tape/tapeserver/daemon/Constants.hpp"
34
35
#include "castor/tape/tapeserver/daemon/LabelCmdAcceptHandler.hpp"
#include "castor/tape/tapeserver/daemon/LabelSession.hpp"
36
#include "castor/tape/tapeserver/daemon/DataTransferSession.hpp"
Steven Murray's avatar
Steven Murray committed
37
#include "castor/tape/tapeserver/daemon/TapeDaemon.hpp"
38
#include "castor/tape/tapeserver/daemon/TapeMessageHandler.hpp"
39
#include "castor/tape/tapeserver/daemon/VdqmAcceptHandler.hpp"
Daniele Kruse's avatar
Daniele Kruse committed
40
#include "castor/tape/tapeserver/file/File.hpp"
Steven Murray's avatar
Steven Murray committed
41
#include "castor/tape/utils/utils.hpp"
42
#include "castor/utils/SmartFd.hpp"
43
#include "castor/utils/utils.hpp"
Daniele Kruse's avatar
Daniele Kruse committed
44
#include "h/Ctape.h"
45
46
#include "h/rtcp_constants.h"
#include "h/rtcpd_constants.h"
47

Steven Murray's avatar
Steven Murray committed
48
#include <algorithm>
49
#include <errno.h>
50
#include <limits.h>
Steven Murray's avatar
Steven Murray committed
51
52
#include <memory>
#include <signal.h>
53
54
#include <sys/prctl.h>
#include <sys/socket.h>
55
56
57
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
Steven Murray's avatar
Steven Murray committed
58
59
60
61

//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
62
castor::tape::tapeserver::daemon::TapeDaemon::TapeDaemon(
63
64
65
66
67
  const int argc,
  char **const argv,
  std::ostream &stdOut,
  std::ostream &stdErr,
  log::Logger &log,
68
  const utils::DriveConfigMap &driveConfigs,
69
70
  legacymsg::VdqmProxy &vdqm,
  legacymsg::VmgrProxy &vmgr,
71
  legacymsg::RmcProxyFactory &rmcFactory,
72
  messages::TapeserverProxyFactory &tapeserverFactory,
Daniele Kruse's avatar
Daniele Kruse committed
73
  legacymsg::NsProxyFactory &nsFactory,
74
  reactor::ZMQReactor &reactor,
75
  castor::server::ProcessCap &capUtils):
76
77
78
  castor::server::Daemon(stdOut, stdErr, log),
  m_argc(argc),
  m_argv(argv),
79
  m_driveConfigs(driveConfigs),
80
81
  m_vdqm(vdqm),
  m_vmgr(vmgr),
82
  m_rmcFactory(rmcFactory),
83
  m_tapeserverFactory(tapeserverFactory),
Daniele Kruse's avatar
Daniele Kruse committed
84
  m_nsFactory(nsFactory),
85
  m_reactor(reactor),
86
  m_capUtils(capUtils),
87
  m_programName("tapeserverd"),
88
89
  m_hostName(getHostName()),
  m_processForkerCmdSenderSocket(-1),
90
91
  m_processForkerPid(0),
  m_zmqContext(NULL) {
92
93
94
95
96
97
}

//------------------------------------------------------------------------------
// getHostName
//------------------------------------------------------------------------------
std::string
98
  castor::tape::tapeserver::daemon::TapeDaemon::getHostName()
99
  const  {
100
101
  char nameBuf[81];
  if(gethostname(nameBuf, sizeof(nameBuf))) {
102
103
    char message[100];
    sstrerror_r(errno, message, sizeof(message));
104
    castor::exception::Exception ex;
105
    ex.getMessage() << "Failed to get host name: " << message;
106
107
108
109
    throw ex;
  }

  return nameBuf;
Steven Murray's avatar
Steven Murray committed
110
111
112
113
114
115
}

//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeDaemon::~TapeDaemon() throw() {
Steven Murray's avatar
Steven Murray committed
116
117
118
119
120
121
122
123
124
  closeProcessForkerCmdSenderSocket();
  destroyZmqContext();
}

//------------------------------------------------------------------------------
// closeProcessForkerCmdSenderSocket
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::
  closeProcessForkerCmdSenderSocket() throw() {
125
126
127
128
129
130
131
132
133
134
135
  if(-1 != m_processForkerCmdSenderSocket) {
    std::list<log::Param> params;
    params.push_back(
      log::Param("cmdSenderSocket", m_processForkerCmdSenderSocket));
    if(close(m_processForkerCmdSenderSocket)) {
      char message[100];
      strerror_r(errno, message, sizeof(message));
      params.push_back(log::Param("message", message));
      m_log(LOG_ERR, "Failed to close the socket used for sending copmmands to"
        " the ProcessForker", params);
    } else {
Steven Murray's avatar
Steven Murray committed
136
137
      m_processForkerCmdSenderSocket = -1;
      m_log(LOG_INFO, "Successfully closed the socket used for sending commands"
138
139
140
        " to the ProcessForker", params);
    }
  }
Steven Murray's avatar
Steven Murray committed
141
}
142

Steven Murray's avatar
Steven Murray committed
143
144
145
146
//------------------------------------------------------------------------------
// destroyZmqContext
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::destroyZmqContext() throw() {
147
148
149
150
151
152
  if(NULL != m_zmqContext) {
    if(zmq_term(m_zmqContext)) {
      char message[100];
      sstrerror_r(errno, message, sizeof(message));
      castor::log::Param params[] = {castor::log::Param("message", message)};
      m_log(LOG_ERR, "Failed to destroy ZMQ context", params);
Steven Murray's avatar
Steven Murray committed
153
154
155
    } else {
      m_zmqContext = NULL;
      m_log(LOG_INFO, "Successfully destroyed ZMQ context");
156
157
    }
  }
Steven Murray's avatar
Steven Murray committed
158
159
160
161
162
}

//------------------------------------------------------------------------------
// main
//------------------------------------------------------------------------------
163
int castor::tape::tapeserver::daemon::TapeDaemon::main() throw() {
Steven Murray's avatar
Steven Murray committed
164
165
  try {

Steven Murray's avatar
Steven Murray committed
166
    exceptionThrowingMain(m_argc, m_argv);
Steven Murray's avatar
Steven Murray committed
167
168
169

  } catch (castor::exception::Exception &ex) {
    std::ostringstream msg;
Steven Murray's avatar
Steven Murray committed
170
171
    msg << "Aborting: Caught an unexpected exception: " <<
      ex.getMessage().str();
Steven Murray's avatar
Steven Murray committed
172
173
174
175
176
    m_stdErr << std::endl << msg.str() << std::endl << std::endl;

    log::Param params[] = {
      log::Param("Message", msg.str()),
      log::Param("Code"   , ex.code())};
177
    m_log(LOG_ERR, msg.str(), params);
Steven Murray's avatar
Steven Murray committed
178
179
180
181
182
183
184
185
186
187

    return 1;
  }

  return 0;
}

//------------------------------------------------------------------------------
// exceptionThrowingMain
//------------------------------------------------------------------------------
188
void  castor::tape::tapeserver::daemon::TapeDaemon::exceptionThrowingMain(
189
  const int argc, char **const argv)  {
Steven Murray's avatar
Steven Murray committed
190
191
  logStartOfDaemon(argc, argv);
  parseCommandLine(argc, argv);
192
  m_driveCatalogue.populateCatalogue(m_driveConfigs);
193
194
195

  // Process must be able to change user now and should be permitted to perform
  // raw IO in the future
196
  setProcessCapabilities("cap_setgid,cap_setuid+ep cap_sys_rawio+p");
197

198
  const bool runAsStagerSuperuser = true;
199
  daemonizeIfNotRunInForeground(runAsStagerSuperuser);
200
201
  setDumpable();
  forkProcessForker();
202

203
  // There is no longer any need for the process to be able to change user,
204
205
  // however the process should still be permitted to perform raw IO in the
  // future
206
207
  setProcessCapabilities("cap_sys_rawio+p");

Steven Murray's avatar
Steven Murray committed
208
  blockSignals();
209
  initZmqContext();
210
  setUpReactor();
211
  registerTapeDrivesWithVdqm();
212
  mainEventLoop();
Steven Murray's avatar
Steven Murray committed
213
214
215
216
217
218
219
220
221
222
223
224
225
}

//------------------------------------------------------------------------------
// logStartOfDaemon
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::logStartOfDaemon(
  const int argc, const char *const *const argv) throw() {
  const std::string concatenatedArgs = argvToString(argc, argv);
  std::ostringstream msg;
  msg << m_programName << " started";

  log::Param params[] = {
    log::Param("argv", concatenatedArgs)};
226
  m_log(LOG_INFO, msg.str(), params);
Steven Murray's avatar
Steven Murray committed
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
}

//------------------------------------------------------------------------------
// argvToString
//------------------------------------------------------------------------------
std::string castor::tape::tapeserver::daemon::TapeDaemon::argvToString(
  const int argc, const char *const *const argv) throw() {
  std::string str;

  for(int i=0; i < argc; i++) {
    if(i != 0) {
      str += " ";
    }

    str += argv[i];
  }
  return str;
}

246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
//------------------------------------------------------------------------------
// setDumpable
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::setDumpable() {
  castor::utils::setDumpableProcessAttribute(true);
  const bool dumpable = castor::utils::getDumpableProcessAttribute();
  log::Param params[] = {
    log::Param("dumpable", dumpable ? "true" : "false")};
  m_log(LOG_INFO, "Got dumpable attribute of process", params);
  if(!dumpable) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to set dumpable attribute of process to true";
    throw ex;
  }
}

//------------------------------------------------------------------------------
// setProcessCapabilities
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::setProcessCapabilities(
  const std::string &text) {
  try {
    m_capUtils.setProcText(text);
    log::Param params[] =
      {log::Param("capabilities", m_capUtils.getProcText())};
    m_log(LOG_INFO, "Set process capabilities", params);
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to set process capabilities to '" << text <<
      "': " << ne.getMessage().str();
    throw ex;
  }
}

//------------------------------------------------------------------------------
// forkProcessForker
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::forkProcessForker() {
  m_log.prepareForFork();
  // Create a socket pair for controlling the ProcessForker
  int sv[2] = {-1 , -1};
  if(socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
    char message[100];
    strerror_r(errno, message, sizeof(message));
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to fork process forker: Failed to create socket"
      " pair for controlling the ProcessForker: " << message;
    throw ex;
  }

  // TapeDaemon should store the socket responsible for sending commands to the
  // ProcessForker
  //
  // The destructor of the TapeDaemon will close the socket responsible for
  // sending commands to the ProcessForker
  m_processForkerCmdSenderSocket = sv[0];

  const int processForkerCmdReceiverSocket = sv[1];

  {
    log::Param params[] = {
      log::Param("cmdSenderSocket", m_processForkerCmdSenderSocket),
      log::Param("cmdReceiverSocket", processForkerCmdReceiverSocket)};
    m_log(LOG_INFO, "TapeDaemon parent process succesfully created socket"
      " pair for controlling the ProcessForker", params);
  }

  const pid_t forkRc = fork();

  // If fork failed
  if(0 > forkRc) {
    close(processForkerCmdReceiverSocket);

    char message[100];
    sstrerror_r(errno, message, sizeof(message));
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to fork ProcessForker: " << message;
    throw ex;

  // Else if this is the parent process
  } else if(0 < forkRc) {
    m_processForkerPid = forkRc;

    {
      log::Param params[] = {
        log::Param("processForkerPid", m_processForkerPid)};
      m_log(LOG_INFO, "Successfully forked the ProcessForker", params);
    }

    if(close(processForkerCmdReceiverSocket)) {
      char message[100];
      sstrerror_r(errno, message, sizeof(message));
      castor::exception::Exception ex;
      ex.getMessage() << "TapeDaemon parent process failed to close the socket"
        " used to receive ProcessForker commands: " << message;
      throw ex;
    }

    {
      log::Param params[] =
        {log::Param("cmdReceiverSocket", processForkerCmdReceiverSocket)};
      m_log(LOG_INFO, "TapeDaemon parent process successfully closed the socket"
        " used to receive ProcessForker commands", params);
    }

    return;

  // Else this is the child process
  } else {
    // Clear the reactor which in turn will close all of the open
    // file-descriptors owned by the event handlers
    m_reactor.clear();

    if(close(m_processForkerCmdSenderSocket)) {
      char message[100];
      sstrerror_r(errno, message, sizeof(message));
      castor::exception::Exception ex;
      ex.getMessage() << "ProcessForker process failed to close the socket"
        " used to send ProcessForker commands: " << message;
      throw ex;
    }

    {
      m_processForkerCmdSenderSocket = -1;
      log::Param params[] = 
        {log::Param("cmdSenderSocket", m_processForkerCmdSenderSocket)};
      m_log(LOG_INFO, "ProcessForker process successfully closed the socket" 
        " used to send ProcessForker commands", params);
    }

    runProcessForker(processForkerCmdReceiverSocket);

    exit(0);
  }
}

//------------------------------------------------------------------------------
// runProcessForker
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::runProcessForker(
  const int cmdReceiverSocket) {

  if(close(cmdReceiverSocket)) {
    char message[100];
    sstrerror_r(errno, message, sizeof(message));
    castor::exception::Exception ex;
    ex.getMessage() << "ProcessForker process failed to close the socket"
      " used to receive ProcessForker commands: " << message;
    throw ex;
  }

  {
    log::Param params[] =
      {log::Param("cmdReceiverSocket", cmdReceiverSocket)};
    m_log(LOG_INFO, "ProcessForker process successfully closed the socket"
      " used to receive ProcessForker commands", params);
  }
}

Steven Murray's avatar
Steven Murray committed
405
406
407
//------------------------------------------------------------------------------
// blockSignals
//------------------------------------------------------------------------------
408
void castor::tape::tapeserver::daemon::TapeDaemon::blockSignals() const {
Steven Murray's avatar
Steven Murray committed
409
410
  sigset_t sigs;
  sigemptyset(&sigs);
411
  // The signals that should not asynchronously disturb the daemon
Steven Murray's avatar
Steven Murray committed
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
  sigaddset(&sigs, SIGHUP);
  sigaddset(&sigs, SIGINT);
  sigaddset(&sigs, SIGQUIT);
  sigaddset(&sigs, SIGPIPE);
  sigaddset(&sigs, SIGTERM);
  sigaddset(&sigs, SIGUSR1);
  sigaddset(&sigs, SIGUSR2);
  sigaddset(&sigs, SIGCHLD);
  sigaddset(&sigs, SIGTSTP);
  sigaddset(&sigs, SIGTTIN);
  sigaddset(&sigs, SIGTTOU);
  sigaddset(&sigs, SIGPOLL);
  sigaddset(&sigs, SIGURG);
  sigaddset(&sigs, SIGVTALRM);
  castor::exception::Errnum::throwOnNonZero(
    sigprocmask(SIG_BLOCK, &sigs, NULL),
428
429
430
    "Failed to block signals: sigprocmask() failed");
}

431
432
433
434
//------------------------------------------------------------------------------
// registerTapeDrivesWithVdqm
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::registerTapeDrivesWithVdqm()
435
   {
436
437
438
439
440
441
442
443
444
445
446
447
  const std::list<std::string> unitNames = m_driveCatalogue.getUnitNames();

  for(std::list<std::string>::const_iterator itor = unitNames.begin();
    itor != unitNames.end(); itor++) {
    registerTapeDriveWithVdqm(*itor);
  }
}

//------------------------------------------------------------------------------
// registerTapeDriveWithVdqm
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::registerTapeDriveWithVdqm(
448
  const std::string &unitName)  {
Daniele Kruse's avatar
Daniele Kruse committed
449
450
  const DriveCatalogueEntry *drive = m_driveCatalogue.findDrive(unitName);
  const utils::DriveConfig &driveConfig = drive->getConfig();
451
452

  std::list<log::Param> params;
453
  params.push_back(log::Param("server", m_hostName));
454
  params.push_back(log::Param("unitName", unitName));
455
  params.push_back(log::Param("dgn", driveConfig.dgn));
456

Daniele Kruse's avatar
Daniele Kruse committed
457
  switch(drive->getState()) {
Steven Murray's avatar
Steven Murray committed
458
  case DriveCatalogueEntry::DRIVE_STATE_DOWN:
459
    params.push_back(log::Param("state", "down"));
Steven Murray's avatar
Steven Murray committed
460
    m_log(LOG_INFO, "Registering tape drive in vdqm", params);
461
    m_vdqm.setDriveDown(m_hostName, unitName, driveConfig.dgn);
462
    break;
Steven Murray's avatar
Steven Murray committed
463
  case DriveCatalogueEntry::DRIVE_STATE_UP:
464
    params.push_back(log::Param("state", "up"));
Steven Murray's avatar
Steven Murray committed
465
    m_log(LOG_INFO, "Registering tape drive in vdqm", params);
466
    m_vdqm.setDriveUp(m_hostName, unitName, driveConfig.dgn);
467
    break;
468
469
  default:
    {
470
      castor::exception::Exception ex;
Steven Murray's avatar
Steven Murray committed
471
      ex.getMessage() << "Failed to register tape drive in vdqm"
472
473
        ": server=" << m_hostName << " unitName=" << unitName << " dgn=" <<
        driveConfig.dgn << ": Invalid drive state: state=" <<
Daniele Kruse's avatar
Daniele Kruse committed
474
        DriveCatalogueEntry::drvState2Str(drive->getState());
475
476
477
      throw ex;
    }
  }
478
479
}

480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//------------------------------------------------------------------------------
// initZmqContext
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::initZmqContext() {
  const int sizeOfIOThreadPoolForZMQ = 1;
  m_zmqContext = zmq_init(sizeOfIOThreadPoolForZMQ);
  if(NULL == m_zmqContext) {
    char message[100];
    sstrerror_r(errno, message, sizeof(message));
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to instantiate ZMQ context: " << message;
    throw ex;
  }
}

495
//------------------------------------------------------------------------------
496
// setUpReactor
497
//------------------------------------------------------------------------------
498
void castor::tape::tapeserver::daemon::TapeDaemon::setUpReactor() {
499
500
  createAndRegisterVdqmAcceptHandler();
  createAndRegisterAdminAcceptHandler();
501
  createAndRegisterLabelCmdAcceptHandler();
502
  createAndRegisterTapeMessageHandler();
503
504
505
506
507
}

//------------------------------------------------------------------------------
// createAndRegisterVdqmAcceptHandler
//------------------------------------------------------------------------------
508
void castor::tape::tapeserver::daemon::TapeDaemon::createAndRegisterVdqmAcceptHandler()  {
509
  castor::utils::SmartFd listenSock;
510
  try {
511
    listenSock.reset(io::createListenerSock(TAPE_SERVER_VDQM_LISTENING_PORT));
512
513
514
515
516
517
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex(ne.code());
    ex.getMessage() << "Failed to create socket to listen for vdqm connections"
      ": " << ne.getMessage().str();
    throw ex;
  }
518
519
  {
    log::Param params[] = {
520
      log::Param("listeningPort", TAPE_SERVER_VDQM_LISTENING_PORT)};
521
522
523
    m_log(LOG_INFO, "Listening for connections from the vdqmd daemon", params);
  }

524
  std::auto_ptr<VdqmAcceptHandler> vdqmAcceptHandler;
525
  try {
526
    vdqmAcceptHandler.reset(new VdqmAcceptHandler(listenSock.get(), m_reactor,
527
      m_log, m_driveCatalogue));
528
    listenSock.release();
529
530
531
  } catch(std::bad_alloc &ba) {
    castor::exception::BadAlloc ex;
    ex.getMessage() <<
532
      "Failed to create event handler for accepting vdqm connections"
533
534
      ": " << ba.what();
    throw ex;
535
  }
536
537
  m_reactor.registerHandler(vdqmAcceptHandler.get());
  vdqmAcceptHandler.release();
538
539
540
}

//------------------------------------------------------------------------------
541
// createAndRegisterAdminAcceptHandler
542
//------------------------------------------------------------------------------
543
void castor::tape::tapeserver::daemon::TapeDaemon::createAndRegisterAdminAcceptHandler()  {
544
  castor::utils::SmartFd listenSock;
545
  try {
546
    listenSock.reset(io::createListenerSock(TAPE_SERVER_ADMIN_LISTENING_PORT));
547
548
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex(ne.code());
549
550
    ex.getMessage() <<
      "Failed to create socket to listen for admin command connections"
551
552
553
554
555
556
      ": " << ne.getMessage().str();
    throw ex;
  }
  {
    log::Param params[] = {
      log::Param("listeningPort", TAPE_SERVER_ADMIN_LISTENING_PORT)};
557
558
    m_log(LOG_INFO, "Listening for connections from the admin commands",
      params);
559
560
561
562
  }

  std::auto_ptr<AdminAcceptHandler> adminAcceptHandler;
  try {
563
    adminAcceptHandler.reset(new AdminAcceptHandler(listenSock.get(), m_reactor,
564
      m_log, m_vdqm, m_driveCatalogue, m_hostName));
565
    listenSock.release();
566
567
568
  } catch(std::bad_alloc &ba) {
    castor::exception::BadAlloc ex;
    ex.getMessage() <<
569
      "Failed to create event handler for accepting admin connections"
570
571
572
573
574
      ": " << ba.what();
    throw ex;
  }
  m_reactor.registerHandler(adminAcceptHandler.get());
  adminAcceptHandler.release();
575
576
}

577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
//------------------------------------------------------------------------------
// createAndRegisterLabelCmdAcceptHandler
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::createAndRegisterLabelCmdAcceptHandler()  {
  castor::utils::SmartFd listenSock;
  try {
    listenSock.reset(io::createListenerSock(TAPE_SERVER_LABELCMD_LISTENING_PORT));
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex(ne.code());
    ex.getMessage() <<
      "Failed to create socket to listen for admin command connections"
      ": " << ne.getMessage().str();
    throw ex;
  }
  {
    log::Param params[] = {
      log::Param("listeningPort", TAPE_SERVER_LABELCMD_LISTENING_PORT)};
    m_log(LOG_INFO, "Listening for connections from label command",
      params);
  }

  std::auto_ptr<LabelCmdAcceptHandler> labelCmdAcceptHandler;
  try {
Steven Murray's avatar
Steven Murray committed
600
601
    labelCmdAcceptHandler.reset(new LabelCmdAcceptHandler(listenSock.get(),
      m_reactor, m_log, m_driveCatalogue, m_hostName, m_vdqm, m_vmgr));
602
603
604
605
606
607
608
609
610
611
612
    listenSock.release();
  } catch(std::bad_alloc &ba) {
    castor::exception::BadAlloc ex;
    ex.getMessage() <<
      "Failed to create event handler for accepting label-command connections"
      ": " << ba.what();
    throw ex;
  }
  m_reactor.registerHandler(labelCmdAcceptHandler.get());
  labelCmdAcceptHandler.release();
}
Daniele Kruse's avatar
Daniele Kruse committed
613

614
//------------------------------------------------------------------------------
615
// createAndRegisterTapeMessageHandler
616
//------------------------------------------------------------------------------
617
618
void castor::tape::tapeserver::daemon::TapeDaemon::
  createAndRegisterTapeMessageHandler()  {
619
620
  std::auto_ptr<TapeMessageHandler> tapeMessageHandler;
  try {
621
622
    tapeMessageHandler.reset(new TapeMessageHandler(m_reactor, m_log,
      m_driveCatalogue, m_hostName, m_vdqm, m_vmgr, m_zmqContext));
623
624
625
626
627
628
629
630
631
632
633
  } catch(std::bad_alloc &ba) {
    castor::exception::BadAlloc ex;
    ex.getMessage() <<
      "Failed to create event handler for communicating with forked sessions"
      ": " << ba.what();
    throw ex;
  }
  m_reactor.registerHandler(tapeMessageHandler.get());
  tapeMessageHandler.release();
}

634
//------------------------------------------------------------------------------
635
// mainEventLoop
636
//------------------------------------------------------------------------------
637
void castor::tape::tapeserver::daemon::TapeDaemon::mainEventLoop() {
638
  while(handleEvents()) {
639
    forkDataTransferSessions();
640
    forkLabelSessions();
641
642
643
  }
}

644
645
646
647
//------------------------------------------------------------------------------
// handleEvents
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::TapeDaemon::handleEvents()
648
   {
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
  // With our current understanding we see no reason for an exception from the
  // reactor to be used as a reason to stop the tapeserverd daemon.
  //
  // In the future, one or more specific exception types could be introduced
  // with their own catch clauses that do in fact require the tapeserverd daemon
  // to be stopped.
  try {
    const int timeout = 100; // 100 milliseconds
    m_reactor.handleEvents(timeout);
  } catch(castor::exception::Exception &ex) {
    // Log exception and continue
    log::Param params[] = {log::Param("message", ex.getMessage().str())};
    m_log(LOG_ERR, "Unexpected exception thrown when handling an I/O event",
      params);
  } catch(std::exception &se) {
    // Log exception and continue
    log::Param params[] = {log::Param("message", se.what())};
    m_log(LOG_ERR, "Unexpected exception thrown when handling an I/O event",
      params);
  } catch(...) {
    // Log exception and continue
    m_log(LOG_ERR,
      "Unexpected and unknown exception thrown when handling an I/O event");
  }

674
  return handlePendingSignals();
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
}

//------------------------------------------------------------------------------
// handlePendingSignals
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::TapeDaemon::handlePendingSignals()
  throw() {
  bool continueMainEventLoop = true;
  int sig = 0;
  sigset_t allSignals;
  siginfo_t sigInfo;
  sigfillset(&allSignals);
  struct timespec immedTimeout = {0, 0};

  // While there is a pending signal to be handled
  while (0 < (sig = sigtimedwait(&allSignals, &sigInfo, &immedTimeout))) {
    switch(sig) {
692
    case SIGINT: // Signal number 2
693
      m_log(LOG_INFO, "Stopping gracefully because SIGINT was received");
694
695
      continueMainEventLoop = false;
      break;
696
    case SIGTERM: // Signal number 15
697
      m_log(LOG_INFO, "Stopping gracefully because SIGTERM was received");
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
      continueMainEventLoop = false;
      break;
    case SIGCHLD: // Signal number 17
      reapZombies();
      break;
    default:
      {
        log::Param params[] = {log::Param("signal", sig)};
        m_log(LOG_INFO, "Ignoring signal", params);
      }
      break;
    }
  }

  return continueMainEventLoop;
}

//------------------------------------------------------------------------------
// reapZombies
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::reapZombies() throw() {
719
  pid_t pid = 0;
720
  int waitpidStat = 0;
721
722
  while (0 < (pid = waitpid(-1, &waitpidStat, WNOHANG))) {
    handleReapedProcess(pid, waitpidStat);
723
724
725
726
  }
}

//------------------------------------------------------------------------------
727
// handleReapedProcess
728
//------------------------------------------------------------------------------
729
730
731
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedProcess(
  const pid_t pid, const int waitpidStat) throw() {
  logChildProcessTerminated(pid, waitpidStat);
732

733
734
735
736
737
738
  if(pid == m_processForkerPid) {
    handleReapedProcessForker(pid, waitpidStat);
  } else {
    handleReapedSession(pid, waitpidStat);
  }
}
739

740
741
742
743
744
745
746
747
748
749
//------------------------------------------------------------------------------
// handleReapedProcessForker
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedProcessForker(
  const pid_t pid, const int waitpidStat) throw() {
  log::Param params[] = {
    log::Param("processForkerPid", pid)};
  m_log(LOG_INFO, "Handling reaped ProcessForker", params);
  m_processForkerPid = 0;
}
750

751
752
753
754
755
756
757
758
759
//------------------------------------------------------------------------------
// handleReapedSession
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedSession(
  const pid_t pid, const int waitpidStat) throw() {
  try {
    const DriveCatalogueEntry *const drive =
      m_driveCatalogue.findConstDrive(pid);
    dispatchReapedSessionHandler(drive->getSessionType(), pid,
760
      waitpidStat);
761
  } catch(castor::exception::Exception &ne) {
762
763
    log::Param params[] = {log::Param("message", ne.getMessage().str())};
    m_log(LOG_ERR, "Failed to handle reaped session",
764
      params);
Steven Murray's avatar
Steven Murray committed
765
  }
766
767
768
}

//------------------------------------------------------------------------------
769
// logChildProcessTerminated
770
//------------------------------------------------------------------------------
771
772
void castor::tape::tapeserver::daemon::TapeDaemon::logChildProcessTerminated(
  const pid_t pid, const int waitpidStat) throw() {
773
  std::list<log::Param> params;
774
  params.push_back(log::Param("terminatedPid", pid));
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799

  if(WIFEXITED(waitpidStat)) {
    params.push_back(log::Param("WEXITSTATUS", WEXITSTATUS(waitpidStat)));
  }

  if(WIFSIGNALED(waitpidStat)) {
    params.push_back(log::Param("WTERMSIG", WTERMSIG(waitpidStat)));
  }

  if(WCOREDUMP(waitpidStat)) {
    params.push_back(log::Param("WCOREDUMP", "true"));
  } else {
    params.push_back(log::Param("WCOREDUMP", "false"));
  }

  if(WIFSTOPPED(waitpidStat)) {
    params.push_back(log::Param("WSTOPSIG", WSTOPSIG(waitpidStat)));
  }

  if(WIFCONTINUED(waitpidStat)) {
    params.push_back(log::Param("WIFCONTINUED", "true"));
  } else {
    params.push_back(log::Param("WIFCONTINUED", "false"));
  }

800
  m_log(LOG_INFO, "Child process terminated", params);
801
802
}

803
//------------------------------------------------------------------------------
804
// dispatchReapedSessionHandler
805
806
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::
807
  dispatchReapedSessionHandler(
808
  const DriveCatalogueEntry::SessionType sessionType,
809
  const pid_t pid,
810
811
812
813
  const int waitpidStat) {

  switch(sessionType) {
  case DriveCatalogueEntry::SESSION_TYPE_DATATRANSFER:
814
    return handleReapedDataTransferSession(pid, waitpidStat);
815
  case DriveCatalogueEntry::SESSION_TYPE_LABEL:
816
    return handleReapedLabelSession(pid, waitpidStat);
817
818
819
  default:
    {
      castor::exception::Exception ex;
820
      ex.getMessage() << "Failed to dispatch handler for reaped session"
821
822
823
824
825
826
        ": Unexpected session type: sessionType=" << sessionType;
      throw ex;
    }
  }
}

827
//------------------------------------------------------------------------------
828
// handleReapedDataTransferSession
829
//------------------------------------------------------------------------------
830
831
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedDataTransferSession(
  const pid_t pid, const int waitpidStat) {
832
833
  try {
    std::list<log::Param> params;
834
835
    params.push_back(log::Param("dataTransferPid", pid));
    DriveCatalogueEntry *const drive = m_driveCatalogue.findDrive(pid);
Daniele Kruse's avatar
Daniele Kruse committed
836
    const utils::DriveConfig &driveConfig = drive->getConfig();
Steven Murray's avatar
Steven Murray committed
837

838
    if(WIFEXITED(waitpidStat) && 0 == WEXITSTATUS(waitpidStat)) {
Daniele Kruse's avatar
Daniele Kruse committed
839
840
      const std::string vid = drive->getVid();
      drive->sessionSucceeded();
841
842
843
      m_log(LOG_INFO, "Data-transfer session succeeded", params);
      requestVdqmToReleaseDrive(driveConfig, pid);
      notifyVdqmTapeUnmounted(driveConfig, vid, pid);
844
    } else {
Daniele Kruse's avatar
Daniele Kruse committed
845
      drive->sessionFailed();
846
847
      m_log(LOG_INFO, "Data-transfer session failed", params);
      setDriveDownInVdqm(pid, drive->getConfig());
848
    }
849
850
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
851
    ex.getMessage() << "Failed to handle reaped data transfer session: " << 
852
853
    ne.getMessage().str();
    throw ex;
854
  }
Steven Murray's avatar
Steven Murray committed
855
}
856

857
858
859
860
//------------------------------------------------------------------------------
// requestVdqmToReleaseDrive
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::requestVdqmToReleaseDrive(
861
  const utils::DriveConfig &driveConfig, const pid_t pid) {
862
863
864
  std::list<log::Param> params;
  try {
    const bool forceUnmount = true;
865

866
    params.push_back(log::Param("pid", pid));
867
868
    params.push_back(log::Param("unitName", driveConfig.unitName));
    params.push_back(log::Param("dgn", driveConfig.dgn));
869
870
    params.push_back(log::Param("forceUnmount", forceUnmount));

871
    m_vdqm.releaseDrive(m_hostName, driveConfig.unitName, driveConfig.dgn,
872
      forceUnmount, pid);
873
874
875
    m_log(LOG_INFO, "Requested vdqm to release drive", params);
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
876
    ex.getMessage() << "Failed to request vdqm to release drive: " <<
877
878
879
880
881
      ne.getMessage().str();
    throw ex;
  }
}

882
//------------------------------------------------------------------------------
883
// notifyVdqmTapeUnmounted
884
//------------------------------------------------------------------------------
885
886
void castor::tape::tapeserver::daemon::TapeDaemon::notifyVdqmTapeUnmounted(
  const utils::DriveConfig &driveConfig, const std::string &vid,
887
  const pid_t pid) {
888
  try {
889
    std::list<log::Param> params;
890
    params.push_back(log::Param("pid", pid));
891
892
893
    params.push_back(log::Param("unitName", driveConfig.unitName));
    params.push_back(log::Param("vid", vid));
    params.push_back(log::Param("dgn", driveConfig.dgn));
894

895
896
    m_vdqm.tapeUnmounted(m_hostName, driveConfig.unitName, driveConfig.dgn,
      vid);
897
    m_log(LOG_INFO, "Notified vdqm that a tape was unmounted", params);
898
899
900
901
902
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed notify vdqm that a tape was unmounted: " <<
      ne.getMessage().str();
    throw ex;
903
  }
904
}
905

906
907
908
//------------------------------------------------------------------------------
// setDriveDownInVdqm
//------------------------------------------------------------------------------
909
void castor::tape::tapeserver::daemon::TapeDaemon::setDriveDownInVdqm(
910
  const pid_t pid, const utils::DriveConfig &driveConfig) {
911
  std::list<log::Param> params;
912
  params.push_back(log::Param("pid", pid));
913

914
915
916
917
918
  try {
    params.push_back(log::Param("unitName", driveConfig.unitName));
    params.push_back(log::Param("dgn", driveConfig.dgn));

    m_vdqm.setDriveDown(m_hostName, driveConfig.unitName, driveConfig.dgn);
919
    m_log(LOG_INFO, "Set tape-drive down in vdqm", params);
920
921
922
923
924
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to set tape-drive down in vdqm: " <<
      ne.getMessage().str();
    throw ex;
925
  }
Steven Murray's avatar
Steven Murray committed
926
}
927

928
//------------------------------------------------------------------------------
929
// handleReapedLabelSession 
930
//------------------------------------------------------------------------------
931
932
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedLabelSession(
  const pid_t pid, const int waitpidStat) {
933
934
  try {
    std::list<log::Param> params;
935
    params.push_back(log::Param("labelPid", pid));
936

937
    DriveCatalogueEntry *const drive = m_driveCatalogue.findDrive(pid);
938

939
    if(WIFEXITED(waitpidStat) && 0 == WEXITSTATUS(waitpidStat)) {
Daniele Kruse's avatar
Daniele Kruse committed
940
      drive->sessionSucceeded();
941
942
      m_log(LOG_INFO, "Label session succeeded", params);
    } else {
Daniele Kruse's avatar
Daniele Kruse committed
943
      drive->sessionFailed();
944
      m_log(LOG_INFO, "Label session failed", params);
945
      setDriveDownInVdqm(pid, drive->getConfig());
946
947
948
    }
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
949
    ex.getMessage() << "Failed to handle reaped label session: " <<
950
951
    ne.getMessage().str();
    throw ex;
952
953
954
  }
}

955
//------------------------------------------------------------------------------
956
// forkDataTransferSessions
957
//------------------------------------------------------------------------------
Steven Murray's avatar
Steven Murray committed
958
959
960
961
void castor::tape::tapeserver::daemon::TapeDaemon::forkDataTransferSessions()
  throw() {
  const std::list<std::string> unitNames =
    m_driveCatalogue.getUnitNamesWaitingForTransferFork();
962
963
964

  for(std::list<std::string>::const_iterator itor = unitNames.begin();
    itor != unitNames.end(); itor++) {
965
    const std::string unitName = *itor;
Daniele Kruse's avatar
Daniele Kruse committed
966
    DriveCatalogueEntry *drive = m_driveCatalogue.findDrive(unitName);
967
    forkDataTransferSession(drive);
968
969
970
971
  }
}

//------------------------------------------------------------------------------
972
// forkDataTransferSession
973
//------------------------------------------------------------------------------
974
void castor::tape::tapeserver::daemon::TapeDaemon::forkDataTransferSession(
Daniele Kruse's avatar
Daniele Kruse committed
975
976
  DriveCatalogueEntry *drive) throw() {
  const utils::DriveConfig &driveConfig = drive->getConfig();
977

978
979
  std::list<log::Param> params;
  params.push_back(log::Param("unitName", driveConfig.unitName));
980

981
  m_log.prepareForFork();
982

983
  const pid_t forkRc = fork();
984
985

  // If fork failed
986
  if(0 > forkRc) {
987
    // Log an error message and return
988
989
990
    char message[100];
    sstrerror_r(errno, message, sizeof(message));
    params.push_back(log::Param("message", message));
Steven Murray's avatar
Steven Murray committed
991
992
    m_log(LOG_ERR, "Failed to fork data-transfer session for tape drive",
      params);
993
    return;
994
995

  // Else if this is the parent process
996
  } else if(0 < forkRc) {
997
    drive->forkedDataTransferSession(forkRc);
998
    return;
999
1000

  // Else this is the child process
For faster browsing, not all history is shown. View entire blame