ProcessForker.cpp 35.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *
 *
21
 * @author Castor Dev team, castor-dev@cern.ch
22
23
 *****************************************************************************/

24
#include "castor/acs/Constants.hpp"
25
#include "castor/exception/Exception.hpp"
26
#include "castor/legacymsg/RmcProxyTcpIp.hpp"
27
28
29
#include "castor/mediachanger/MediaChangerFacade.hpp"
#include "castor/mediachanger/MmcProxyLog.hpp"
#include "castor/messages/AcsProxyZmq.hpp"
30
#include "castor/messages/Constants.hpp"
31
32
33
#include "castor/messages/ForkCleaner.pb.h"
#include "castor/messages/ForkDataTransfer.pb.h"
#include "castor/messages/ForkLabel.pb.h"
34
#include "castor/messages/ForkSucceeded.pb.h"
35
36
#include "castor/messages/ProcessCrashed.pb.h"
#include "castor/messages/ProcessExited.pb.h"
37
#include "castor/messages/ReturnValue.pb.h"
38
#include "castor/messages/SmartZmqContext.hpp"
39
#include "castor/messages/StopProcessForker.pb.h"
40
41
#include "castor/messages/TapeserverProxyZmq.hpp"
#include "castor/tape/tapeserver/daemon/Constants.hpp"
42
43
#include "castor/tape/tapeserver/daemon/CleanerSession.hpp"
#include "castor/tape/tapeserver/daemon/DataTransferSession.hpp"
44
#include "castor/tape/tapeserver/daemon/DriveConfig.hpp"
45
#include "castor/tape/tapeserver/daemon/LabelSession.hpp"
46
#include "castor/tape/tapeserver/daemon/ProcessForker.hpp"
47
#include "castor/tape/tapeserver/daemon/ProcessForkerUtils.hpp"
48
#include "castor/utils/SmartArrayPtr.hpp"
49
#include "castor/utils/utils.hpp"
50
#include "catalogue/CatalogueFactory.hpp"
51
52
#include "rdbms/Sqlite.hpp"
#include "rdbms/SqliteConn.hpp"
53
#include "objectstore/BackendVFS.hpp"
54
#include "objectstore/BackendFactory.hpp"
55
#include "objectstore/BackendPopulator.hpp"
56
57
58
59
#include "objectstore/RootEntry.hpp"
#include "remotens/EosNS.hpp"
#include "scheduler/OStoreDB/OStoreDB.hpp"
#include "scheduler/Scheduler.hpp"
60
#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp"
61

62
#include <errno.h>
63
#include <memory>
64
65
66
67
#include <poll.h>
#include <sstream>
#include <sys/types.h>
#include <sys/wait.h>
68
69

//------------------------------------------------------------------------------
70
// constructor
71
//------------------------------------------------------------------------------
72
73
74
75
castor::tape::tapeserver::daemon::ProcessForker::ProcessForker(
  log::Logger &log,
  const int cmdSocket,
  const int reaperSocket,
76
  const std::string &hostName,
77
  char *const argv0,
78
  const TapeDaemonConfig &config) throw():
79
80
81
  m_log(log),
  m_cmdSocket(cmdSocket),
  m_reaperSocket(reaperSocket),
82
  m_hostName(hostName),
83
84
  m_argv0(argv0),
  m_config(config) {
85
86
87
}

//------------------------------------------------------------------------------
88
// destructor
89
//------------------------------------------------------------------------------
90
castor::tape::tapeserver::daemon::ProcessForker::~ProcessForker() throw() {
91
92
93
94
95
96
97
98
99
100
101
102
  closeCmdReceiverSocket();
}

//------------------------------------------------------------------------------
// closeCmdReceiverSocket
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::ProcessForker::closeCmdReceiverSocket()
  throw() {
  if(-1 != m_cmdSocket) {
    std::list<log::Param> params;
    params.push_back(log::Param("cmdSocket", m_cmdSocket));
    if(-1 == close(m_cmdSocket)) {
103
      const std::string message = castor::utils::errnoToString(errno);
104
105
106
107
108
109
      params.push_back(log::Param("message", message));
      m_log(LOG_ERR, "Failed to close command receiver socket",
        params);
    } else {
      m_log(LOG_INFO, "Closed command receiver socket", params);
    }
110
  }
111
}
112
113
114
115

//------------------------------------------------------------------------------
// execute
//------------------------------------------------------------------------------
116
117
118
void castor::tape::tapeserver::daemon::ProcessForker::execute() throw() {
  // The main event loop
  while(handleEvents()) {
119
120
121
122
123
124
  }
}

//------------------------------------------------------------------------------
// handleEvents
//------------------------------------------------------------------------------
125
126
127
128
bool castor::tape::tapeserver::daemon::ProcessForker::handleEvents() throw() {
  try {
    return handlePendingMsgs() && handlePendingSignals();
  } catch(castor::exception::Exception &ex) {
129
    std::list<log::Param> params = {log::Param("message", ex.getMessage().str())};
130
131
    m_log(LOG_ERR, "ProcessForker failed to handle events", params);
  } catch(std::exception &se) {
132
    std::list<log::Param> params = {log::Param("message", se.what())};
133
134
    m_log(LOG_ERR, "ProcessForker failed to handle events", params);
  } catch(...) {
135
    std::list<log::Param> params =
136
137
138
      {log::Param("message", "Caught an unknown exception")};
    m_log(LOG_ERR, "ProcessForker failed to handle events", params);
  }
139
140

  // If program execution reached here then an exception was thrown
141
142
  m_log(LOG_ERR, "ProcessForker is gracefully shutting down");
  return false;
143
144
145
146
147
148
}

//------------------------------------------------------------------------------
// handlePendingMsgs
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::ProcessForker::handlePendingMsgs() {
149
150
151
152
153
  if(thereIsAPendingMsg()) {
    return handleMsg();
  } else {
    return true; // The main event loop should continue
  }
154
155
156
}

//------------------------------------------------------------------------------
157
// thereIsAPendingMsg
158
//------------------------------------------------------------------------------
159
bool castor::tape::tapeserver::daemon::ProcessForker::thereIsAPendingMsg() {
160

161
162
  // Call poll() in orer to see if there is any data to be read
  struct pollfd fds;
163
  fds.fd = m_cmdSocket;
164
165
166
167
  fds.events = POLLIN;
  fds.revents = 0;
  const int timeout = 100; // Timeout in milliseconds
  const int pollRc = poll(&fds, 1, timeout);
168

169
170
171
172
173
174
  // Return true of false depending on the result from poll()
  switch(pollRc) {
  case 0: // Timeout
    return false;
  case -1: // Error
    {
175
      const std::string message = castor::utils::errnoToString(errno);
176
      std::list<log::Param> params = {log::Param("message", message)};
177
178
179
180
181
182
183
184
185
186
187
188
      m_log(LOG_ERR,
        "Error detected when checking for a pending ProcessForker message",
        params);
      return false;
    }
  case 1: // There is a possibility of a pending message
    return fds.revents & POLLIN ? true : false;
  default: // Unexpected return value
    {
      std::ostringstream message;
      message << "poll returned an unexpected value"
        ": expected=0 or 1 actual=" << pollRc;
189
      std::list<log::Param> params = {log::Param("message", message.str())};
190
191
192
193
194
195
      m_log(LOG_ERR,
        "Error detected when checking for a pending ProcessForker message",
        params);
      return false;
    }
  }
196
197
198
}

//------------------------------------------------------------------------------
199
// handleMsg
200
//------------------------------------------------------------------------------
201
202
203
bool castor::tape::tapeserver::daemon::ProcessForker::handleMsg() {
  ProcessForkerFrame frame;
  try {
204
205
    const int timeout = 10; // Timeout in seconds
    frame = ProcessForkerUtils::readFrame(m_cmdSocket, timeout);
206
  } catch(castor::exception::Exception &ne) {
207
    castor::exception::Exception ex;
208
    ex.getMessage() << "Failed to handle message: " << ne.getMessage().str();
209
210
211
    throw ex;
  }

212
  std::list<log::Param> params = {
213
    log::Param("type", messages::msgTypeToString(frame.type)),
214
215
216
217
218
219
220
221
222
223
    log::Param("len", frame.payload.length())};
  m_log(LOG_INFO, "ProcessForker handling a ProcessForker message", params);

  MsgHandlerResult result;
  try {
    result = dispatchMsgHandler(frame);
  } catch(castor::exception::Exception &ex) {
    log::Param("message", ex.getMessage().str());
    m_log(LOG_ERR, "ProcessForker::dispatchMsgHandler() threw an exception",
      params);
224
225
    messages::Exception msg;
    msg.set_code(ex.code());
226
    msg.set_message(ex.getMessage().str());
227
    ProcessForkerUtils::writeFrame(m_cmdSocket, msg);
228
229
230
231
232
    return true; // The main event loop should continue
  } catch(std::exception &se) {
    log::Param("message", se.what());
    m_log(LOG_ERR, "ProcessForker::dispatchMsgHandler() threw an exception",
      params);
233
    messages::Exception msg;
234
    msg.set_code(666);
235
    msg.set_message(se.what());
236
    ProcessForkerUtils::writeFrame(m_cmdSocket, msg);
237
238
239
240
    return true; // The main event loop should continue
  } catch(...) {
    m_log(LOG_ERR,
      "ProcessForker::dispatchMsgHandler() threw an unknown exception");
241
    messages::Exception msg;
242
    msg.set_code(666);
243
    msg.set_message("Caught and unknown and unexpected exception");
244
    ProcessForkerUtils::writeFrame(m_cmdSocket, msg);
245
    return true; // The main event loop should continue
246
247
  }

248
  ProcessForkerUtils::writeFrame(m_cmdSocket, result.reply);
249
  {
250
    std::list<log::Param> params = {
251
      log::Param("payloadType",
252
        messages::msgTypeToString(result.reply.type)),
253
254
255
256
      log::Param("payloadLen", result.reply.payload.length())};
    m_log(LOG_DEBUG, "ProcessForker wrote reply", params);
  }
  return result.continueMainEventLoop;
257
258
259
260
261
}

//------------------------------------------------------------------------------
// dispatchMsgHandler
//------------------------------------------------------------------------------
262
263
264
265
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult
  castor::tape::tapeserver::daemon::ProcessForker::dispatchMsgHandler(
  const ProcessForkerFrame &frame) {
  switch(frame.type) {
266
  case messages::MSG_TYPE_FORKCLEANER:
267
    return handleForkCleanerMsg(frame);
268
  case messages::MSG_TYPE_FORKDATATRANSFER:
269
    return handleForkDataTransferMsg(frame);
270
  case messages::MSG_TYPE_FORKLABEL:
271
    return handleForkLabelMsg(frame);
272
  case messages::MSG_TYPE_STOPPROCESSFORKER:
273
    return handleStopProcessForkerMsg(frame);
274
275
276
277
  default:
    {
      castor::exception::Exception ex;
      ex.getMessage() << "Failed to dispatch message handler"
278
        ": Unknown message type: type=" << frame.type;
279
280
281
282
283
284
      throw ex;
    }
  }
}

//------------------------------------------------------------------------------
285
// handleForkCleanerMsg
286
//------------------------------------------------------------------------------
287
288
289
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult 
  castor::tape::tapeserver::daemon::ProcessForker::handleForkCleanerMsg(
  const ProcessForkerFrame &frame) {
290

291
292
293
294
295
296
297
  // Parse the incoming request
  messages::ForkCleaner rqst;
  ProcessForkerUtils::parsePayload(frame, rqst);

  // Log the contents of the incomming request
  std::list<log::Param> params;
  params.push_back(log::Param("unitName", rqst.unitname()));
298
  params.push_back(log::Param("TPVID", rqst.vid()));
299
300
301
302
  params.push_back(log::Param("waitMediaInDrive",
    rqst.waitmediaindrive()));
  params.push_back(log::Param("waitMediaInDriveTimeout",
    rqst.waitmediaindrivetimeout()));
303
304
305
306
307
308
309
  m_log(LOG_INFO, "ProcessForker handling ForkCleaner message", params);

  // Fork a label session
  const pid_t forkRc = fork();

  // If fork failed
  if(0 > forkRc) {
310
    return createExceptionResult(666,
311
      "Failed to fork cleaner session for tape drive", true);
312
313
314

  // Else if this is the parent process
  } else if(0 < forkRc) {
315
    std::list<log::Param> params = {log::Param("pid", forkRc)};
316
317
    m_log(LOG_INFO, "ProcessForker forked cleaner session", params);

318
    return createForkSucceededResult(forkRc, true);
319
320
321

  // Else this is the child process
  } else {
322
    closeCmdReceiverSocket();
323

324
    castor::utils::setProcessNameAndCmdLine(m_argv0, "cleaner");
325

326
327
328
    try {
      exit(runCleanerSession(rqst));
    } catch(castor::exception::Exception &ne) {
329
      std::list<log::Param> params = {log::Param("message", ne.getMessage().str())};
Steven Murray's avatar
Steven Murray committed
330
      m_log(LOG_ERR, "Cleaner session failed", params);
331
    } catch(std::exception &ne) {
332
      std::list<log::Param> params = {log::Param("message", ne.what())};
Steven Murray's avatar
Steven Murray committed
333
      m_log(LOG_ERR, "Cleaner session failed", params);
334
    } catch(...) {
335
      std::list<log::Param> params = {log::Param("message",
336
        "Caught an unknown exception")};
Steven Murray's avatar
Steven Murray committed
337
      m_log(LOG_ERR, "Cleaner session failed", params);
338
    }
339
    exit(Session::MARK_DRIVE_AS_DOWN);
340
341
342
343
  }
}

//------------------------------------------------------------------------------
344
// handleForkDataTransferMsg
345
//------------------------------------------------------------------------------
346
347
348
349
350
351
352
353
354
355
356
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult 
  castor::tape::tapeserver::daemon::ProcessForker::handleForkDataTransferMsg(
  const ProcessForkerFrame &frame) {

  // Parse the incoming request
  messages::ForkDataTransfer rqst;
  ProcessForkerUtils::parsePayload(frame, rqst);

  // Log the contents of the incomming request
  std::list<log::Param> params;
  params.push_back(log::Param("unitName", rqst.unitname()));
357
358
  m_log(LOG_INFO, "ProcessForker handling ForkDataTransfer message", params);

359
360
361
362
363
  // Fork a data-transfer session
  const pid_t forkRc = fork();

  // If fork failed
  if(0 > forkRc) {
364
    return createExceptionResult(666,
365
      "Failed to fork data-transfer session for tape drive", true);
366
367
  // Else if this is the parent process
  } else if(0 < forkRc) {
368
    std::list<log::Param> params = {log::Param("pid", forkRc)};
369
370
    m_log(LOG_INFO, "ProcessForker forked data-transfer session", params);

371
    return createForkSucceededResult(forkRc, true);
372
373
374

  // Else this is the child process
  } else {
375
376
    closeCmdReceiverSocket();

377
    castor::utils::setProcessNameAndCmdLine(m_argv0, "transfer");
378

379
380
381
    try {
      exit(runDataTransferSession(rqst));
    } catch(castor::exception::Exception &ne) {
382
      std::list<log::Param> params = {log::Param("message", ne.getMessage().str())};
Steven Murray's avatar
Steven Murray committed
383
      m_log(LOG_ERR, "Data-transfer session failed", params);
384
    } catch(std::exception &ne) {
385
      std::list<log::Param> params = {log::Param("message", ne.what())};
Steven Murray's avatar
Steven Murray committed
386
      m_log(LOG_ERR, "Data-transfer session failed", params);
387
    } catch(...) {
388
      std::list<log::Param> params = {log::Param("message",
389
        "Caught an unknown exception")};
Steven Murray's avatar
Steven Murray committed
390
      m_log(LOG_ERR, "Data-transfer session failed", params);
391
    }
392
    exit(Session::CLEAN_DRIVE);
393
394
395
396
  }
}

//------------------------------------------------------------------------------
397
// handleForkLabelMsg
398
//------------------------------------------------------------------------------
399
400
401
402
403
404
405
406
407
408
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult 
  castor::tape::tapeserver::daemon::ProcessForker::handleForkLabelMsg(
  const ProcessForkerFrame &frame) {
  // Parse the incoming request
  messages::ForkLabel rqst;
  ProcessForkerUtils::parsePayload(frame, rqst);

  // Log the contents of the incomming request
  std::list<log::Param> params;
  params.push_back(log::Param("unitName", rqst.unitname()));
409
  params.push_back(log::Param("TPVID", rqst.vid()));
410
411
412
413
  m_log(LOG_INFO, "ProcessForker handling ForkLabel message", params);

  // Fork a label session
  const pid_t forkRc = fork();
414

415
416
  // If fork failed
  if(0 > forkRc) {
417
    return createExceptionResult(666,
418
      "Failed to fork label session for tape drive", true);
419
420
421

  // Else if this is the parent process
  } else if(0 < forkRc) {
422
    std::list<log::Param> params = {log::Param("pid", forkRc)};
423
424
    m_log(LOG_INFO, "ProcessForker forked label session", params);

425
    return createForkSucceededResult(forkRc, true);
426
427
428

  // Else this is the child process
  } else {
429
430
    closeCmdReceiverSocket();

431
    castor::utils::setProcessNameAndCmdLine(m_argv0, "label");
432

433
434
435
    try {
      exit(runLabelSession(rqst));
    } catch(castor::exception::Exception &ne) {
436
      std::list<log::Param> params = {log::Param("message", ne.getMessage().str())};
Steven Murray's avatar
Steven Murray committed
437
      m_log(LOG_ERR, "Label session failed", params);
438
    } catch(std::exception &ne) {
439
      std::list<log::Param> params = {log::Param("message", ne.what())};
Steven Murray's avatar
Steven Murray committed
440
      m_log(LOG_ERR, "Label session failed", params);
441
    } catch(...) {
442
      std::list<log::Param> params = {log::Param("message",
443
        "Caught an unknown exception")};
Steven Murray's avatar
Steven Murray committed
444
      m_log(LOG_ERR, "Label session failed", params);
445
    }
446
    exit(Session::CLEAN_DRIVE);
447
  }
448
449
450
451
452
453
454
455
456
457
458
459
460
461
}

//------------------------------------------------------------------------------
// handleStopProcessForkerMsg
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult 
  castor::tape::tapeserver::daemon::ProcessForker::
  handleStopProcessForkerMsg(const ProcessForkerFrame &frame) {

  // Parse the incoming request
  messages::StopProcessForker rqst;
  ProcessForkerUtils::parsePayload(frame, rqst);

  // Log the fact that the ProcessForker will not gracefully stop
462
  std::list<log::Param> params = {log::Param("reason", rqst.reason())};
463
  m_log(LOG_INFO, "Gracefully stopping ProcessForker", params);
464

465
  return createReturnValueResult(0, false);
466
}
467

468
469
470
//------------------------------------------------------------------------------
// runCleanerSession
//------------------------------------------------------------------------------
471
472
castor::tape::tapeserver::daemon::Session::EndOfSessionAction
  castor::tape::tapeserver::daemon::ProcessForker::runCleanerSession(
473
474
  const messages::ForkCleaner &rqst) {
  try {
475
    castor::server::ProcessCap capUtils;
476

477
    const DriveConfig driveConfig = getDriveConfig(rqst);
478
    std::list<log::Param> params;
479
    params.push_back(log::Param("unitName", driveConfig.getUnitName()));
480
    params.push_back(log::Param("TPVID", rqst.vid()));
481
482
    params.push_back(log::Param("waitMediaInDrive", rqst.waitmediaindrive()));
    params.push_back(log::Param("waitMediaInDriveTimeout", rqst.waitmediaindrivetimeout()));
483
484
    m_log(LOG_INFO, "Cleaner-session child-process started", params);

485
486
487
488
    const int sizeOfIOThreadPoolForZMQ = 1;
    messages::SmartZmqContext
      zmqContext(instantiateZmqContext(sizeOfIOThreadPoolForZMQ));

489
    messages::AcsProxyZmq acs(acs::ACS_PORT, zmqContext.get());
490
491
492

    mediachanger::MmcProxyLog mmc(m_log);

493
494
495
    // The network timeout of rmc communications should be several minutes due
    // to the time it takes to mount and unmount tapes
    const int rmcNetTimeout = 600; // Timeout in seconds
496
497
498

    legacymsg::RmcProxyTcpIp rmc(m_config.rmcPort, rmcNetTimeout,
      m_config.rmcMaxRqstAttempts);
499
500
501

    mediachanger::MediaChangerFacade mediaChangerFacade(acs, mmc, rmc);

502
503
    castor::tape::System::realWrapper sWrapper;
    CleanerSession cleanerSession(
504
      capUtils,
505
      mediaChangerFacade,
506
507
      m_log,
      driveConfig,
508
      sWrapper,
509
      rqst.vid(),
510
511
      rqst.waitmediaindrive(),
      rqst.waitmediaindrivetimeout());
512
    return cleanerSession.execute();
Steven Murray's avatar
Steven Murray committed
513
  } catch(castor::exception::Exception &ex) {
514
515
516
    throw ex;
  } catch(std::exception &se) {
    castor::exception::Exception ex;
Steven Murray's avatar
Steven Murray committed
517
    ex.getMessage() << se.what();
518
519
    throw ex;
  } catch(...) {
Steven Murray's avatar
Steven Murray committed
520
521
    castor::exception::Exception ex;
    ex.getMessage() << "Caught an unknown exception";
522
523
524
525
    throw ex;
  }
}

526
527
528
//------------------------------------------------------------------------------
// runDataTransferSession
//------------------------------------------------------------------------------
529
530
castor::tape::tapeserver::daemon::Session::EndOfSessionAction
  castor::tape::tapeserver::daemon::ProcessForker::runDataTransferSession(
531
  const messages::ForkDataTransfer &rqst) {
532
  const DriveConfig driveConfig = getDriveConfig(rqst);
533
534

  std::list<log::Param> params;
535
  params.push_back(log::Param("unitName", driveConfig.getUnitName()));
536
537
  m_log(LOG_INFO, "Data-transfer child-process started", params);

538
  server::ProcessCap capUtils;
539

540
541
542
543
  const int sizeOfIOThreadPoolForZMQ = 1;
  messages::SmartZmqContext
    zmqContext(instantiateZmqContext(sizeOfIOThreadPoolForZMQ));

544
  messages::AcsProxyZmq acs(acs::ACS_PORT, zmqContext.get());
545
546
547

  mediachanger::MmcProxyLog mmc(m_log);

548
549
550
  // The network timeout of rmc communications should be several minutes due
  // to the time it takes to mount and unmount tapes
  const int rmcNetTimeout = 600; // Timeout in seconds
551
552
553

  legacymsg::RmcProxyTcpIp rmc(m_config.rmcPort, rmcNetTimeout,
    m_config.rmcMaxRqstAttempts);
554

555
556
  mediachanger::MediaChangerFacade mediaChangerFacade(acs, mmc, rmc);

557
558
  messages::TapeserverProxyZmq tapeserver(m_log, m_config.internalPort,
    zmqContext.get());
559
  
560
  cta::EosNS eosNs(castor::common::CastorConfiguration::getConfig().getConfEntString("TapeServer", "EOSRemoteHostAndPort"));
561
562
563
564
565
566
567
568
569
  std::unique_ptr<cta::objectstore::Backend> backend(
    cta::objectstore::BackendFactory::createBackend(
      castor::common::CastorConfiguration::getConfig().getConfEntString("TapeServer", "ObjectStoreBackendPath"))
        .release());
  // If the backend is a VFS, make sure we don't delete it on exit.
  // If not, nevermind.
  try {
    dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
  } catch (std::bad_cast &){}
570
571
  cta::objectstore::BackendPopulator backendPopulator(*backend);
  cta::OStoreDBWithAgent osdb(*backend, backendPopulator.getAgent());
572
  const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile("/etc/cta/cta_catalogue_db.conf");
573
  const uint64_t nbConns = 1;
574
  std::unique_ptr<cta::catalogue::Catalogue> catalogue(cta::catalogue::CatalogueFactory::create(catalogueLogin, nbConns));
575
  cta::Scheduler scheduler(*catalogue, osdb, 5, 2*1000*1000); //TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them
576
577
578
579
580

  castor::tape::System::realWrapper sysWrapper;

  // This try bloc will allow us to send a failure notification to the client
  // if we fail before the DataTransferSession has an opportunity to do so.
581
  std::unique_ptr<DataTransferSession> dataTransferSession;
582
583
584
585
586
587
  try {
    dataTransferSession.reset(new DataTransferSession (
      m_hostName,
      m_log,
      sysWrapper,
      driveConfig,
588
      mediaChangerFacade,
589
590
      tapeserver,
      capUtils,
591
592
      m_config.dataTransfer,
      scheduler));
593
  } catch (castor::exception::Exception & ex) {
594
595
596
    params.push_back(log::Param("errorMessage", ex.getMessageValue()));
    params.push_back(log::Param("errorCode", ex.code()));
    m_log(LOG_ERR, "Failed to set up the data-transfer session", params);
597
598
    throw;
  } catch (...) {
599
    m_log(LOG_ERR, "Got non castor exception error while constructing data-transfer session", params);
600
601
    throw;
  }
Steven Murray's avatar
Steven Murray committed
602
603
604
605
606
607
608
609
610
611
612
613
614
615

  try {
    return dataTransferSession->execute();
  } catch(castor::exception::Exception &ex) {
    throw ex;
  } catch(std::exception &se) {
    castor::exception::Exception ex;
    ex.getMessage() << se.what();
    throw ex;
  } catch(...) {
    castor::exception::Exception ex;
    ex.getMessage() << "Caught an unknown exception";
    throw ex;
  }
616
617
618
619
620
621
622
623
624
}

//------------------------------------------------------------------------------
// instantiateZmqContext
//------------------------------------------------------------------------------
void *castor::tape::tapeserver::daemon::ProcessForker::instantiateZmqContext(
  const int sizeOfIOThreadPoolForZMQ) {
  void *const zmqContext = zmq_init(sizeOfIOThreadPoolForZMQ);
  if(NULL == zmqContext) {
625
    const std::string message = castor::utils::errnoToString(errno);
626
    castor::exception::Exception ex;
627
628
    ex.getMessage() << "Child of ProcessForker failed to instantiate ZMQ"
      " context: " << message;
629
630
    throw ex;
  }
631

632
633
  std::ostringstream contextAddr;
  contextAddr << std::hex << zmqContext;
634
  std::list<log::Param> params = {log::Param("contextAddr", contextAddr.str())};
635
  m_log(LOG_INFO, "Child of ProcessForker instantiated a ZMQ context", params);
636
637
638
639

  return zmqContext;
}

640
641
642
643
644
//------------------------------------------------------------------------------
// handlePendingSignals
//------------------------------------------------------------------------------
bool castor::tape::tapeserver::daemon::ProcessForker::handlePendingSignals() {
  try {
645
    // Handle a pending SIGCHLD by reaping the associated zombie(s)
646
647
    reapZombies();

648
649
    // For now there are no signals that require a gracefully shutdown of the
    // main loop of the ProcessForker
650
651
652
653
654
655
656
657
658
    return true; // The main event loop should continue
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle pending signals: " <<
      ne.getMessage().str();
    throw ex;
  }
}

659
660
661
//------------------------------------------------------------------------------
// reapZombies
//------------------------------------------------------------------------------
662
void castor::tape::tapeserver::daemon::ProcessForker::reapZombies() {
663
664
665
666
667
668
669
670
671
672
673
  pid_t pid = 0;
  int waitpidStat = 0;
  while (0 < (pid = waitpid(-1, &waitpidStat, WNOHANG))) {
    handleReapedZombie(pid, waitpidStat);
  }
}

//------------------------------------------------------------------------------
// handleReapedZombie
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::ProcessForker::handleReapedZombie(
674
  const pid_t pid, const int waitpidStat) {
675
676
677
  try {
    logChildProcessTerminated(pid, waitpidStat);
    notifyTapeDaemonOfTerminatedProcess(pid, waitpidStat);
678
679
680
681
682
683
684
685
686
687
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle reaped zombie: pid=" << pid <<
      ne.getMessage().str();
    throw ex;
  } catch(std::exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle reaped zombie: pid=" << pid <<
      ne.what();
    throw ex;
688
  } catch(...) {
689
690
691
692
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to handle reaped zombie: pid=" << pid <<
      ": Caught an unknown exception";
    throw ex;
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
  }
} 

//------------------------------------------------------------------------------
// logChildProcessTerminated
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::ProcessForker::logChildProcessTerminated(
  const pid_t pid, const int waitpidStat) throw() {
  std::list<log::Param> params;
  params.push_back(log::Param("terminatedPid", pid));

  if(WIFEXITED(waitpidStat)) {
    params.push_back(log::Param("WEXITSTATUS", WEXITSTATUS(waitpidStat)));
  }

  if(WIFSIGNALED(waitpidStat)) {
    params.push_back(log::Param("WTERMSIG", WTERMSIG(waitpidStat)));
  }

  if(WCOREDUMP(waitpidStat)) {
    params.push_back(log::Param("WCOREDUMP", "true"));
  } else {
    params.push_back(log::Param("WCOREDUMP", "false"));
  }

  if(WIFSTOPPED(waitpidStat)) {
    params.push_back(log::Param("WSTOPSIG", WSTOPSIG(waitpidStat)));
  }

  if(WIFCONTINUED(waitpidStat)) {
    params.push_back(log::Param("WIFCONTINUED", "true"));
  } else {
    params.push_back(log::Param("WIFCONTINUED", "false"));
  }

  m_log(LOG_INFO, "ProcessForker child process terminated", params);
}

//------------------------------------------------------------------------------
// notifyTapeDaemonOfTerminatedProcess
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::ProcessForker::
  notifyTapeDaemonOfTerminatedProcess(const pid_t pid, const int waitpidStat) {
  try {
    if(WIFEXITED(waitpidStat)) {
      notifyTapeDaemonOfExitedProcess(pid, waitpidStat);
    } else if(WIFSIGNALED(waitpidStat)) {
      notifyTapeDaemonOfCrashedProcess(pid, waitpidStat);
    } else {
      castor::exception::Exception ex;
      ex.getMessage() << "Process died of unknown causes" << pid;
      throw ex;
    }
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify TapeDaemon of process termination"
      ": pid=" << pid << ": " << ne.getMessage().str();
    throw ex;
  }
}

//------------------------------------------------------------------------------
// notifyTapeDaemonOfExitedProcess
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::ProcessForker::
  notifyTapeDaemonOfExitedProcess(const pid_t pid, const int waitpidStat) {
  try {
    messages::ProcessExited msg;
    msg.set_pid(pid);
    msg.set_exitcode(WEXITSTATUS(waitpidStat));

764
    std::list<log::Param> params = {
765
766
767
768
769
      log::Param("pid", msg.pid()),
      log::Param("exitCode", msg.exitcode()),
      log::Param("payloadLen", msg.ByteSize())};
    m_log(LOG_INFO, "ProcessForker notifying TapeDaemon of process exit",
      params);
770

771
    ProcessForkerUtils::writeFrame(m_reaperSocket, msg, &m_log);
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify TapeDaemon of process exit: " <<
      ne.getMessage().str();
    throw ex;
  } catch(std::exception &se) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify TapeDaemon of process exit: " <<
      se.what();
    throw ex;
  } catch(...) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify TapeDaemon of process exit: "
      "Caught an unknown exception";
    throw ex;
  }
}

//------------------------------------------------------------------------------
// notifyTapeDaemonOfCrashedProcess
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::ProcessForker::
  notifyTapeDaemonOfCrashedProcess(const pid_t pid, const int waitpidStat) {
  try {
    messages::ProcessCrashed msg;
    msg.set_pid(pid);
    msg.set_signal(WTERMSIG(waitpidStat));

800
    std::list<log::Param> params = {log::Param("pid", msg.pid()),
801
      log::Param("signal", msg.signal())};
802
    m_log(LOG_WARNING, "ProcessForker notifying TapeDaemon of process crash",
803
      params);
804

805
    ProcessForkerUtils::writeFrame(m_reaperSocket, msg, &m_log);
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify TapeDaemon of process crash: " <<
      ne.getMessage().str();
    throw ex;
  } catch(std::exception &se) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify TapeDaemon of process crash: " <<
      se.what();
    throw ex;
  } catch(...) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to notify TapeDaemon of process crash: "
      "Caught an unknown exception";
    throw ex;
  }
}
823
824
825
826

//------------------------------------------------------------------------------
// runLabelSession
//------------------------------------------------------------------------------
827
828
castor::tape::tapeserver::daemon::Session::EndOfSessionAction
  castor::tape::tapeserver::daemon::ProcessForker::runLabelSession(
829
830
  const messages::ForkLabel &rqst) {
  try {
831
832
    server::ProcessCap capUtils;

833
    const DriveConfig &driveConfig = getDriveConfig(rqst);
834
835
836
    const legacymsg::TapeLabelRqstMsgBody labelJob = getLabelJob(rqst);

    std::list<log::Param> params;
837
    params.push_back(log::Param("unitName", driveConfig.getUnitName()));
838
    params.push_back(log::Param("TPVID", labelJob.vid));
839
840
    m_log(LOG_INFO, "Label-session child-process started", params);

841
842
843
    const int sizeOfIOThreadPoolForZMQ = 1;
    messages::SmartZmqContext
      zmqContext(instantiateZmqContext(sizeOfIOThreadPoolForZMQ));
844
845
    messages::TapeserverProxyZmq tapeserver(m_log, m_config.internalPort,
      zmqContext.get());
846

847
    messages::AcsProxyZmq acs(acs::ACS_PORT, zmqContext.get());
848
849
850

    mediachanger::MmcProxyLog mmc(m_log);

851
852
853
    // The network timeout of rmc communications should be several minutes due
    // to the time it takes to mount and unmount tapes
    const int rmcNetTimeout = 600; // Timeout in seconds
854
855
856

    legacymsg::RmcProxyTcpIp rmc(m_config.rmcPort, rmcNetTimeout,
      m_config.rmcMaxRqstAttempts);
857
858
859

    mediachanger::MediaChangerFacade mediaChangerFacade(acs, mmc, rmc);

860
861
    castor::tape::System::realWrapper sWrapper;
    LabelSession labelsession(
862
      capUtils,
863
      tapeserver,
864
      mediaChangerFacade,
865
866
867
868
      labelJob,
      m_log,
      sWrapper,
      driveConfig,
869
870
871
      rqst.force(),
      rqst.lbp(),
      m_config.labelSession);
872
    return labelsession.execute();
Steven Murray's avatar
Steven Murray committed
873
874
  } catch(castor::exception::Exception &ex) {
    throw ex;
875
876
  } catch(std::exception &se) {
    castor::exception::Exception ex;
877
    ex.getMessage() << se.what();
878
879
880
    throw ex;
  } catch(...) {
        castor::exception::Exception ex;
881
    ex.getMessage() << "Caught an unknown exception";
882
883
884
885
886
887
888
889
890
891
892
    throw ex;
  }
}

//------------------------------------------------------------------------------
// getLabelJob
//------------------------------------------------------------------------------
castor::legacymsg::TapeLabelRqstMsgBody
  castor::tape::tapeserver::daemon::ProcessForker::getLabelJob(
  const messages::ForkLabel &msg) {
  castor::legacymsg::TapeLabelRqstMsgBody job;
893
  job.lbp = msg.lbp() ? 1 : 0;
894
895
896
897
898
  job.force = msg.force() ? 1 : 0;
  job.uid = msg.uid();
  job.gid = msg.gid();
  castor::utils::copyString(job.vid,msg.vid());
  castor::utils::copyString(job.drive, msg.unitname());
899
  castor::utils::copyString(job.logicalLibrary, msg.logicallibrary());
900
901
  return job;
}
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976

//------------------------------------------------------------------------------
// createForkSucceededResult
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult 
  castor::tape::tapeserver::daemon::ProcessForker::createForkSucceededResult(
  const pid_t pid, const bool continueMainEventLoop) {
  try {
    messages::ForkSucceeded reply;
    reply.set_pid(pid);

    MsgHandlerResult result;
    result.continueMainEventLoop = continueMainEventLoop;
    ProcessForkerUtils::serializePayload(result.reply, reply);
    
    return result;
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() <<
      "Failed to create MsgHandlerResult containing a ForkSucceeded message:"
      << ne.getMessage().str();
    throw ex;
  }
}

//------------------------------------------------------------------------------
// createExceptionResult
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult 
  castor::tape::tapeserver::daemon::ProcessForker::
  createExceptionResult(const uint32_t code, const std::string& message,
    const bool continueMainEventLoop) {
  try {
    messages::Exception reply;
    reply.set_code(code);
    reply.set_message(message);

    MsgHandlerResult result;
    result.continueMainEventLoop = continueMainEventLoop;
    ProcessForkerUtils::serializePayload(result.reply, reply);

    return result;
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() <<
      "Failed to create MsgHandlerResult containing an Exception message:"
      << ne.getMessage().str();
    throw ex;
  }
}

//------------------------------------------------------------------------------
// createReturnValueResult
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult
  castor::tape::tapeserver::daemon::ProcessForker::
  createReturnValueResult(const uint32_t value,
    const bool continueMainEventLoop) {
  try {
    messages::ReturnValue reply;
    reply.set_value(value);

    MsgHandlerResult result;
    result.continueMainEventLoop = continueMainEventLoop;
    ProcessForkerUtils::serializePayload(result.reply, reply);

    return result;
  } catch(castor::exception::Exception &ne) {
    castor::exception::Exception ex;
    ex.getMessage() <<
      "Failed to create MsgHandlerResult containing ReturnValue message:"
      << ne.getMessage().str();
    throw ex;
  }
}