DriveHandler.cpp 62.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "catalogue/CatalogueFactoryFactory.hpp"
20
21
#include "common/log/LogContext.hpp"
#include "common/exception/Errnum.hpp"
22
#include "common/processCap/ProcessCap.hpp"
23
24
#include "DriveHandler.hpp"
#include "DriveHandlerProxy.hpp"
25
26
27
28
#include "objectstore/Backend.hpp"
#include "objectstore/BackendFactory.hpp"
#include "objectstore/BackendVFS.hpp"
#include "objectstore/BackendPopulator.hpp"
29
#include "objectstore/AgentHeartbeatThread.hpp"
30
#include "rdbms/Login.hpp"
31
32
33
#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/CleanerSession.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.hpp"
34
#include "tapeserver/castor/tape/tapeserver/daemon/Session.hpp"
35
36
#include "tapeserver/daemon/WatchdogMessage.pb.h"

37
38
39
40
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <set>
Eric Cano's avatar
Eric Cano committed
41
#include <sys/prctl.h>
42
43
44

namespace cta { namespace tape { namespace  daemon {

45
46
CTA_GENERATE_EXCEPTION_CLASS(DriveAlreadyExistException);
  
47
48
49
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
50
51
52
53
54
55
56
DriveHandler::DriveHandler(const TapedConfiguration & tapedConfig, const TpconfigLine& configline, ProcessManager& pm):
  SubprocessHandler(std::string("drive:")+configline.unitName), m_processManager(pm), 
  m_tapedConfig(tapedConfig), m_configLine(configline),
  m_sessionEndContext(m_processManager.logContext().logger()) {
  // As the handler is started, its first duty is to create a new subprocess. This
  // will be managed by the process manager (initial request in getInitialStatus)
}
57
58
59
60

using session::SessionState;
using session::SessionType;

61
62
63
64
65
66
67
68
69
70
71
72
//------------------------------------------------------------------------------
// DriveHandler::m_stateChangeTimeouts
//------------------------------------------------------------------------------
// The following structure represents the timeouts expected for each session state transitions
// (if needed).
// The session type is not taken into account as a given state gets the same timeout regardless
// of  the type session it is used in.
const std::map<SessionState, DriveHandler::Timeout> DriveHandler::m_stateChangeTimeouts = {
  // Determining the drive is ready takes 1 minute, so waiting 2 should be enough.
  {SessionState::Checking, std::chrono::duration_cast<Timeout>(std::chrono::minutes(2))},
  // Scheduling is expected to take little time, so 5 minutes is plenty. When the scheduling
  // determines there is nothing to do, it will transition to the same state (resetting the timeout).
73
  {SessionState::Scheduling, std::chrono::duration_cast<Timeout>(std::chrono::minutes(5))},
74
  // We expect mounting (mount+load, in fact) the take no more than 10 minutes.
75
  {SessionState::Mounting, std::chrono::duration_cast<Timeout>(std::chrono::minutes(10))},
76
  // Like mounting, unmounting is expected to take less than 10 minutes.
77
  {SessionState::Unmounting, std::chrono::duration_cast<Timeout>(std::chrono::minutes(10))},
78
  // Draining to disk is given a grace period of 30 minutes for changing state.
79
  {SessionState::DrainingToDisk, std::chrono::duration_cast<Timeout>(std::chrono::minutes(30))},
Eric Cano's avatar
Eric Cano committed
80
81
82
  // We expect the process to exit within 5 minutes of getting in this state. This state
  // potentially covers the draining of metadata to central storage (if not faster that 
  // unmounting the tape).
83
84
  // TODO: this is set temporarily to 15 minutes while the reporting is not yet parallelized.
  {SessionState::ShuttingDown, std::chrono::duration_cast<Timeout>(std::chrono::minutes(15))}
85
86
};

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//------------------------------------------------------------------------------
// DriveHandler::m_heartbeatTimeouts
//------------------------------------------------------------------------------
// The following structure represents heartbeat timeouts expected for each session state with data movement.
// TODO: decide on values.
const std::map<SessionState, DriveHandler::Timeout> DriveHandler::m_heartbeatTimeouts = {
  {SessionState::Running, std::chrono::duration_cast<Timeout>(std::chrono::minutes(1))},
  {SessionState::DrainingToDisk, std::chrono::duration_cast<Timeout>(std::chrono::minutes(1))}
};

//------------------------------------------------------------------------------
// DriveHandler::m_dataMovementTimeouts
//------------------------------------------------------------------------------
// The following structure represents the data movement timeouts for the session states involving
// data movements and heartbeats.
102
103
// TODO: decide on values. TODO: bumped up to 15 minutes for the time being as the
// efficient retrieve requests pop is not implemented yet.
104
const std::map<SessionState, DriveHandler::Timeout> DriveHandler::m_dataMovementTimeouts = {
105
106
  {SessionState::Running, std::chrono::duration_cast<Timeout>(std::chrono::minutes(15))},
  {SessionState::DrainingToDisk, std::chrono::duration_cast<Timeout>(std::chrono::minutes(15))}
107
};
108

109
110
111
//------------------------------------------------------------------------------
// DriveHandler::getInitialStatus
//------------------------------------------------------------------------------
112
113
114
115
116
117
SubprocessHandler::ProcessingStatus DriveHandler::getInitialStatus() {
  // As we just start, we need to fork the first subprocess
  m_processingStatus.forkRequested = true;
  return m_processingStatus;
}

118
119
120
//------------------------------------------------------------------------------
// DriveHandler::prepareForFork
//------------------------------------------------------------------------------
121
122
123
124
125
126
void DriveHandler::postForkCleanup() {
  // We are in the child process of another handler. We can close our socket pair
  // without re-registering it from poll.
  m_socketPair.reset(nullptr);
}

127
128
129
//------------------------------------------------------------------------------
// DriveHandler::fork
//------------------------------------------------------------------------------
130
131
132
133
134
135
136
137
138
139
SubprocessHandler::ProcessingStatus DriveHandler::fork() {
  // If anything fails while attempting to fork, we will have to declare ourselves
  // failed and ask for a shutdown by sending the TERM signal to the parent process
  // This will ensure the shutdown-kill sequence managed by the signal handler without code duplication.
  // Record we no longer ask for fork
  m_processingStatus.forkRequested = false;
  try {
    // Check we are in the right state (sanity check)
    if (m_sessionState != SessionState::PendingFork) {
      std::stringstream err;
140
141
      err << "In DriveHandler::fork(): called while not in the expected state: " << session::toString(m_sessionState)
          << " instead of " << session::toString(SessionState::PendingFork);
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
      throw exception::Exception(err.str());
    }
    // First prepare a socket pair for this new subprocess
    m_socketPair.reset(new cta::server::SocketPair());
    // and fork
    m_pid=::fork();
    exception::Errnum::throwOnMinusOne(m_pid, "In DriveHandler::fork(): failed to fork()");
    if (!m_pid) {
      // We are in the child process
      SubprocessHandler::ProcessingStatus ret;
      ret.forkState = SubprocessHandler::ForkState::child;
      return ret;
    } else {
      // We are in the parent process
      m_processingStatus.forkState = SubprocessHandler::ForkState::parent;
157
158
159
      // The subprocess will start by deciding what to do based on previous states.
      // The child process is not forked yet.
      m_sessionState = SessionState::PendingFork;
160
161
162
163
164
165
166
167
168
      // Compute the next timeout
      m_processingStatus.nextTimeout = nextTimeout();
      // Register our socketpair side for epoll after closing child side.
      m_socketPair->close(server::SocketPair::Side::child);
      m_processManager.addFile(m_socketPair->getFdForAccess(server::SocketPair::Side::child), this);
      // We are now ready to react to timeouts and messages from the child process.
      return m_processingStatus;
    }
  } catch (cta::exception::Exception & ex) {
Victor Kotlyar's avatar
Victor Kotlyar committed
169
    cta::log::ScopedParamContainer params(m_processManager.logContext());
170
    params.add("tapeDrive", m_configLine.unitName)
171
172
173
174
175
176
177
178
179
180
181
182
183
          .add("Error", ex.getMessageValue());
    m_processManager.logContext().log(log::ERR, "Failed to fork drive process. Initiating shutdown with SIGTERM.");
    // Wipe all previous states as we are shutting down
    m_processingStatus = SubprocessHandler::ProcessingStatus();
    m_sessionState=SessionState::Shutdown;
    m_processingStatus.shutdownComplete=true;
    m_processingStatus.forkState=SubprocessHandler::ForkState::parent;
    // Initiate shutdown
    ::kill(::getpid(), SIGTERM);
    return m_processingStatus;
  }
}

184
185
186
//------------------------------------------------------------------------------
// DriveHandler::nextTimeout
//------------------------------------------------------------------------------
187
188
decltype (SubprocessHandler::ProcessingStatus::nextTimeout) DriveHandler::nextTimeout() {
  // If a timeout is defined, then we compute its expiration time. Else we just give default timeout (end of times).
189
  decltype (SubprocessHandler::ProcessingStatus::nextTimeout) ret=decltype(SubprocessHandler::ProcessingStatus::nextTimeout)::max();
190
  bool retSet=false;
191
  try {
192
    ret=m_lastStateChangeTime+m_stateChangeTimeouts.at(m_sessionState);
193
194
    retSet=true;
    m_timeoutType="StateChange";
195
196
  } catch (...) {}
  try {
197
198
199
200
201
202
    auto newRet=m_lastHeartBeatTime+m_heartbeatTimeouts.at(m_sessionState);
    if (newRet < ret) {
      ret=newRet;
      retSet=true;
      m_timeoutType="Heartbeat";
    }
203
204
  } catch (...) {}
  try {
205
206
207
208
209
210
    auto newRet=m_lastDataMovementTime+m_dataMovementTimeouts.at(m_sessionState);
    if (newRet < ret) {
      ret=newRet;
      retSet=true;
      m_timeoutType="DataMovement";
    }
211
  } catch (...) {}
212
213
214
215
  if (retSet) {
    m_sessionStateWhenTimeoutDecided=m_sessionState;
    m_sessionTypeWhenTimeoutDecided=m_sessionType;
  }
216
217
218
219
220
221
222
223
224
225
  {
    log::ScopedParamContainer params(m_processManager.logContext());
    params.add("TimeoutType", m_timeoutType)
          .add("LastStateChangeTime", std::chrono::duration_cast<std::chrono::seconds>(m_lastStateChangeTime.time_since_epoch()).count())
          .add("LastHeartBeatTime", std::chrono::duration_cast<std::chrono::seconds>(m_lastHeartBeatTime.time_since_epoch()).count())
          .add("LastDataMovementTime", std::chrono::duration_cast<std::chrono::seconds>(m_lastDataMovementTime.time_since_epoch()).count())
          .add("Now", std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count())
          .add("Timeout", std::chrono::duration_cast<std::chrono::seconds>(ret.time_since_epoch()).count());
    m_processManager.logContext().log(log::DEBUG, "Computed new timeout");
  }
226
  return ret;
227
228
}

229
230
231
//------------------------------------------------------------------------------
// DriveHandler::kill
//------------------------------------------------------------------------------
232
233
void DriveHandler::kill() {
  // If we have a subprocess, kill it and wait for completion (if needed). We do not need to keep
234
235
  // track of the exit state as kill() means we will not be called anymore.
  log::ScopedParamContainer params(m_processManager.logContext());
236
  params.add("tapeDrive", m_configLine.unitName);
237
  if (m_pid != -1) {
238
    params.add("SubProcessId", m_pid);
239
240
241
242
243
    // The socket pair will be reopened on the next fork. Clean it up.
    if (m_socketPair.get()) {
      m_processManager.removeFile(m_socketPair->getFdForAccess(server::SocketPair::Side::child));
      m_socketPair.reset(nullptr);
    }
244
245
246
247
248
249
250
251
252
253
254
255
256
    try {
      exception::Errnum::throwOnMinusOne(::kill(m_pid, SIGKILL),"Failed to kill() subprocess");
      int status;
      // wait for child process exit
      exception::Errnum::throwOnMinusOne(::waitpid(m_pid, &status, 0), "Failed to waitpid() subprocess");
      // Log the status
      params.add("WIFEXITED", WIFEXITED(status));
      if (WIFEXITED(status)) {
        params.add("WEXITSTATUS", WEXITSTATUS(status));
      } else {
        params.add("WIFSIGNALED", WIFSIGNALED(status));
      } 
      m_processManager.logContext().log(log::INFO, "In DriveHandler::kill(): sub process completed");
257
258
259
      m_sessionEndContext.pushOrReplace({"Error_sessionKilled", "1"});
      m_sessionEndContext.pushOrReplace({"killSignal", WTERMSIG(status)});
      m_sessionEndContext.pushOrReplace({"status", "failure"});
260
      m_sessionEndContext.pushOrReplace({"tapeDrive",m_configLine.unitName});
261
      m_sessionEndContext.log(cta::log::INFO, "Tape session finished");
262
      m_sessionEndContext.clear();
263
      m_pid=-1;
264
265
266
    } catch (exception::Exception & ex) {
      params.add("Exception", ex.getMessageValue());
      m_processManager.logContext().log(log::ERR, "In DriveHandler::kill(): failed to kill existing subprocess");
267
268
269
270
271
272
    }
  } else {
    m_processManager.logContext().log(log::INFO, "In DriveHandler::kill(): no subprocess to kill");
  }
}

273
274
275
//------------------------------------------------------------------------------
// DriveHandler::processEvent
//------------------------------------------------------------------------------
276
SubprocessHandler::ProcessingStatus DriveHandler::processEvent() {
277
  log::ScopedParamContainer params(m_processManager.logContext());
278
  params.add("tapeDrive", m_configLine.unitName);
279
280
281
  // Read from the socket pair 
  try {
    serializers::WatchdogMessage message;
282
283
284
285
286
287
288
    auto datagram=m_socketPair->receive();
    if (!message.ParseFromString(datagram)) {
      // Use a the tolerant parser to assess the situation.
      message.ParsePartialFromString(datagram);
      throw cta::exception::Exception(std::string("In SubprocessHandler::ProcessingStatus(): could not parse message: ")+
        message.InitializationErrorString());
    }
289
    // Logs are processed in all cases
290
    processLogs(message);
291
    // If we report bytes, process the report (this is a heartbeat)
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
    if (message.reportingbytes()) {
      processBytes(message);
    }
    // If we report a state change, process it (last as this can change the return value)
    if (message.reportingstate()) {
      switch((SessionState)message.sessionstate()) {
      case SessionState::StartingUp:
        return processStartingUp(message);
      case SessionState::Checking:
        return processChecking(message);
      case SessionState::Scheduling:
        return processScheduling(message);
      case SessionState::Mounting:
        return processMounting(message);
      case SessionState::Running:
        return processRunning(message);
      case SessionState::Unmounting:
        return processUnmounting(message);
      case SessionState::DrainingToDisk:
        return processDrainingToDisk(message);
312
      case SessionState::ShuttingDown:
313
314
315
316
317
318
319
320
321
322
        return processShutingDown(message);
      case SessionState::Fatal:
        return processFatal(message);
      default:
        {
          exception::Exception ex;
          ex.getMessage() << "In DriveHandler::processEvent(): unexpected session state:" 
              << session::toString((SessionState)message.sessionstate());
          throw ex;
        }
323
324
      }
    }
325
    return m_processingStatus;
326
327
328
329
330
331
332
333
334
335
336
337
  } catch(cta::server::SocketPair::PeerDisconnected & ex) {
    // The peer disconnected: close the socket pair and remove it from the epoll list.
    if (m_socketPair.get()) {
      m_processManager.removeFile(m_socketPair->getFdForAccess(server::SocketPair::Side::child));
      m_socketPair.reset(nullptr);
    } else {
      m_processManager.logContext().log(log::ERR, 
        "In DriveHandler::processEvent(): internal error. Got a peer disconnect with no socketPair object");
    }
    // We expect to be woken up by the child's signal.
    cta::log::ScopedParamContainer params(m_processManager.logContext());
    params.add("Message", ex.getMessageValue());
Eric Cano's avatar
Eric Cano committed
338
    m_processManager.logContext().log(log::DEBUG, 
339
340
        "In DriveHandler::processEvent(): Got a peer disconnect: closing socket and waiting for SIGCHILD");
    return m_processingStatus;
341
  } catch(cta::exception::Exception & ex) {
Victor Kotlyar's avatar
Victor Kotlyar committed
342
    cta::log::ScopedParamContainer params(m_processManager.logContext());
343
344
345
346
347
348
349
    params.add("Message", ex.getMessageValue());
    m_processManager.logContext().log(log::ERR, 
        "In DriveHandler::processEvent(): failed");
    return m_processingStatus;
  }
}

350
351
352
353
354
355
//------------------------------------------------------------------------------
// DriveHandler::resetToDefault
//------------------------------------------------------------------------------
void DriveHandler::resetToDefault(PreviousSession previousSessionState) {
  m_pid=-1;
  m_previousSession=previousSessionState;
356
357
358
  m_previousType=m_sessionType;
  m_previousState=m_previousState;
  m_previousVid=m_previousVid;
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
  m_sessionState=SessionState::PendingFork;
  m_sessionType=SessionType::Undetermined;
  m_lastStateChangeTime=decltype(m_lastStateChangeTime)::min();
  m_lastHeartBeatTime=decltype(m_lastHeartBeatTime)::min();
  m_lastDataMovementTime=decltype(m_lastDataMovementTime)::min();
  m_processingStatus.forkRequested = false;
  m_processingStatus.killRequested = false;
  m_processingStatus.nextTimeout = m_processingStatus.nextTimeout.max();
  m_processingStatus.shutdownComplete = false;
  m_processingStatus.shutdownRequested = false;
  m_processingStatus.sigChild = false;
}

//------------------------------------------------------------------------------
// DriveHandler::processStartingUp
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus DriveHandler::processStartingUp(serializers::WatchdogMessage& message) {
  // We expect to reach this state from pending fork. This is the first signal from the new process.
  // Check the transition is expected. This is non-fatal as the drive session has the last word anyway.
378
  log::ScopedParamContainer params(m_processManager.logContext());
379
  params.add("tapeDrive", m_configLine.unitName);
380
  if (m_sessionState!=SessionState::PendingFork || m_sessionType!=SessionType::Undetermined) {
381
382
383
    params.add("ExpectedState", session::toString(SessionState::PendingFork))
          .add("ActualState", session::toString(m_sessionState))
          .add("ExpectedType", session::toString(SessionType::Undetermined));
384
    m_processManager.logContext().log(log::WARNING,
385
      "In DriveHandler::processStartingUp(): Unexpected session type or status. Killing subprocess if present.");
386
387
388
389
  }
  // Record the new state:
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
390
  m_sessionVid="";
391
392
393
394
395
396
397
398
399
400
401
402
403
  // kill(), when called by the process manager does not keep track of states,
  // so here we do it.
  m_sessionState = SessionState::Killed;
  // We do not expected anything from the subprocess anymore, so the default
  // return value is enough.
  m_processingStatus.shutdownComplete=true;
  m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
  return m_processingStatus;
}

//------------------------------------------------------------------------------
// DriveHandler::processScheduling
//------------------------------------------------------------------------------
404
SubprocessHandler::ProcessingStatus DriveHandler::processScheduling(serializers::WatchdogMessage& message) {
405
  // We are either going to schedule 
406
  // Check the transition is expected. This is non-fatal as the drive session has the last word anyway.
407
  log::ScopedParamContainer params(m_processManager.logContext());
408
  params.add("tapeDrive", m_configLine.unitName);
409
410
411
412
413
414
415
416
417
418
  std::set<SessionState> expectedStates = { SessionState::StartingUp, SessionState::Scheduling };
  if (!expectedStates.count(m_sessionState) || 
      m_sessionType != SessionType::Undetermined ||
      (SessionType)message.sessiontype() != SessionType::Undetermined) {
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processScheduling(): unexpected previous state/type.");
419
420
421
422
423
424
425
426
  } else if (m_sessionState != SessionState::Scheduling) {
    // If we see a session state change, it's worth logging (at least in debug mode)
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::DEBUG,
        "In DriveHandler::processScheduling(): state change.");
427
  }
428
429
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
430
  m_sessionVid="";
431
432
433
434
435
436
437
438
439
440
441
442
  // Set the timeout for this state
  m_lastStateChangeTime=std::chrono::steady_clock::now();
  m_processingStatus.nextTimeout=nextTimeout();
  return m_processingStatus;
}

//------------------------------------------------------------------------------
// DriveHandler::processChecking
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus DriveHandler::processChecking(serializers::WatchdogMessage& message) {
  // We expect to come from statup/undefined and to get into checking/cleanup
  // As usual, subprocess has the last word.
443
  log::ScopedParamContainer params(m_processManager.logContext());
444
  params.add("tapeDrive", m_configLine.unitName);
445
446
447
448
449
450
451
452
  if (m_sessionState!=SessionState::StartingUp || m_sessionType!=SessionType::Undetermined||
      (SessionType)message.sessiontype()!=SessionType::Cleanup) {
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processChecking(): unexpected previous state/type.");
453
454
455
456
457
458
459
460
  } else if (m_sessionState != SessionState::Checking) {
    // If we see a session state change, it's worth logging (at least in debug mode)
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::DEBUG,
        "In DriveHandler::processChecking(): state change.");
461
  }
462
463
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
464
  m_sessionVid="";
465
  // Set the timeout for this state
466
  m_lastStateChangeTime=std::chrono::steady_clock::now();
467
468
469
470
  m_processingStatus.nextTimeout=nextTimeout();
  return m_processingStatus;
}

471
472
473
//------------------------------------------------------------------------------
// DriveHandler::processMounting
//------------------------------------------------------------------------------
474
SubprocessHandler::ProcessingStatus DriveHandler::processMounting(serializers::WatchdogMessage& message) {
475
476
  // The only transition expected is from scheduling. Several sessions types are possible
  // As usual, subprocess has the last word.
477
  log::ScopedParamContainer params(m_processManager.logContext());
478
  params.add("tapeDrive", m_configLine.unitName);
479
480
481
482
483
484
485
486
487
488
  std::set<SessionType> expectedNewTypes= { SessionType::Archive, SessionType::Retrieve, SessionType::Label };
  if (m_sessionState!=SessionState::Scheduling ||
      m_sessionType!=SessionType::Undetermined||
      !expectedNewTypes.count((SessionType)message.sessiontype())) {
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processMounting(): unexpected previous state/type.");
489
490
491
492
493
494
  } else if (m_sessionState != SessionState::Checking) {
    // If we see a session state change, it's worth logging (at least in debug mode)
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()))
495
          .add("tapeVid", message.vid());
496
497
    m_processManager.logContext().log(log::DEBUG,
        "In DriveHandler::processMounting(): state change.");
498
  }
499
  m_sessionState=(SessionState)message.sessionstate();
500
  m_sessionType=(SessionType)message.sessiontype();
501
  m_sessionVid=message.vid();
502
503
  // Set the timeout for this state
  m_lastStateChangeTime=std::chrono::steady_clock::now();
504
505
506
507
  m_processingStatus.nextTimeout=nextTimeout();
  return m_processingStatus;
}

508
509
510
//------------------------------------------------------------------------------
// DriveHandler::processRunning
//------------------------------------------------------------------------------
511
512
SubprocessHandler::ProcessingStatus DriveHandler::processRunning(serializers::WatchdogMessage& message) {
  // This status can be reported repeatedly (or we can transition from the previous one: Mounting).
513
514
  // We expect the type not to change (and to be in the right range)
  // As usual, subprocess has the last word.
515
  log::ScopedParamContainer params(m_processManager.logContext());
516
  params.add("tapeDrive", m_configLine.unitName);
517
  std::set<SessionState> expectedStates = { SessionState::Mounting, SessionState::Running };
518
519
520
521
522
523
524
525
526
527
  std::set<SessionType> expectedTypes = { SessionType::Archive, SessionType::Retrieve, SessionType::Label };
  if (!expectedStates.count(m_sessionState) ||
      !expectedTypes.count(m_sessionType)||
      (m_sessionType!=(SessionType)message.sessiontype())) {
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processMounting(): unexpected previous state/type.");
528
529
530
531
532
533
534
535
  } else if (m_sessionState != SessionState::Checking) {
    // If we see a session state change, it's worth logging (at least in debug mode)
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::DEBUG,
        "In DriveHandler::processRunning(): state change.");
536
  }
537
538
539
540
  // Record session state change time and reset other counters.
  if (m_sessionState!=(SessionState)message.sessionstate()) {
    m_lastStateChangeTime=std::chrono::steady_clock::now();
    m_lastDataMovementTime=std::chrono::steady_clock::now();
541
    m_lastHeartBeatTime=std::chrono::steady_clock::now();
542
  }
543
  // Record the state in all cases. Child process knows better.
544
545
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
546
  m_sessionVid=message.vid();
547
  // We can now compute the timeout and check for potential exceeding of timeout
548
  m_processingStatus.nextTimeout = nextTimeout();
549
550
551
  return m_processingStatus;
}

552
553
554
//------------------------------------------------------------------------------
// DriveHandler::processUnmounting
//------------------------------------------------------------------------------
555
SubprocessHandler::ProcessingStatus DriveHandler::processUnmounting(serializers::WatchdogMessage& message) {
556
557
558
  // This status can come from either running (any running compatible session type)
  // of checking in the case of the cleanup session.
  // As usual, subprocess has the last word.
559
  log::ScopedParamContainer params(m_processManager.logContext());
560
  params.add("tapeDrive", m_configLine.unitName);
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
  std::set<std::tuple<SessionState, SessionType>> expectedStateTypes = 
  {
    std::make_tuple( SessionState::Running, SessionType::Archive ),
    std::make_tuple( SessionState::Running, SessionType::Retrieve ),
    std::make_tuple( SessionState::Running, SessionType::Label ),
    std::make_tuple( SessionState::Checking, SessionType::Cleanup )
  };
  // (all types of sessions can unmount).
  if (!expectedStateTypes.count(std::make_tuple(m_sessionState, m_sessionType))) {
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processUnmounting(): unexpected previous state/type.");
576
  }
577
578
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
579
  m_sessionVid=message.vid();
580
581
  // Set the timeout for this state
  m_lastStateChangeTime=std::chrono::steady_clock::now();
582
583
584
585
  m_processingStatus.nextTimeout=nextTimeout();
  return m_processingStatus;
}

586
587
588
//------------------------------------------------------------------------------
// DriveHandler::processDrainingToDisk
//------------------------------------------------------------------------------
589
SubprocessHandler::ProcessingStatus DriveHandler::processDrainingToDisk(serializers::WatchdogMessage& message) {
590
591
  // This status transition is expected from unmounting, and only for retrieve sessions.
  // As usual, subprocess has the last word.
592
  log::ScopedParamContainer params(m_processManager.logContext());
593
  params.add("tapeDrive", m_configLine.unitName);
594
595
596
597
598
599
600
601
  if (SessionState::Unmounting != m_sessionState ||
      SessionType::Retrieve != m_sessionType) {
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processDrainingToDisk(): unexpected previous state/type.");
602
  }
603
604
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
605
  m_sessionVid="";
606
607
  // Set the timeout for this state
  m_lastStateChangeTime=std::chrono::steady_clock::now();
608
609
610
611
  m_processingStatus.nextTimeout=nextTimeout();
  return m_processingStatus;
}

612
613
614
615
616
617
//------------------------------------------------------------------------------
// DriveHandler::processShutingDown
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus DriveHandler::processShutingDown(serializers::WatchdogMessage& message) {
  // This status transition is expected from unmounting, and only for retrieve sessions.
  // As usual, subprocess has the last word.
618
  log::ScopedParamContainer params(m_processManager.logContext());
619
  params.add("tapeDrive", m_configLine.unitName);
620
621
622
623
624
625
626
627
  std::set<SessionState> expectedStates = { SessionState::Unmounting, SessionState::DrainingToDisk };
  if (!expectedStates.count(m_sessionState)) {
    params.add("PreviousState", session::toString(m_sessionState))
          .add("PreviousType", session::toString(m_sessionType))
          .add("NewState", session::toString((SessionState)message.sessionstate()))
          .add("NewType", session::toString((SessionType)message.sessiontype()));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processShutingDown(): unexpected previous state/type.");
628
  }
629
630
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
631
  m_sessionVid="";
632
633
  // Set the timeout for this state
  m_lastStateChangeTime=std::chrono::steady_clock::now();
634
635
636
637
  m_processingStatus.nextTimeout=nextTimeout();
  return m_processingStatus;
}

638
639
640
641
642
643
//------------------------------------------------------------------------------
// DriveHandler::processFatal
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus DriveHandler::processFatal(serializers::WatchdogMessage& message) {
  // This status indicates that the session cannot be run and the server should 
  // shut down (central storage unavailable).
644
  log::ScopedParamContainer params(m_processManager.logContext());
645
  params.add("tapeDrive", m_configLine.unitName);
646
647
  m_sessionState=(SessionState)message.sessionstate();
  m_sessionType=(SessionType)message.sessiontype();
648
  m_sessionVid="";
649
650
651
652
  // Set the timeout for this state
  m_lastStateChangeTime=std::chrono::steady_clock::now();
  m_processingStatus.nextTimeout=nextTimeout();
  m_processingStatus.shutdownRequested=true;
653
  m_processManager.logContext().log(log::CRIT,
654
655
656
        "In DriveHandler::processFatal(): shutting down after fatal failure.");
  return m_processingStatus;
}
657
658
659
//------------------------------------------------------------------------------
// DriveHandler::processLogs
//------------------------------------------------------------------------------
660
661
662
663
664
665
666
667
668
669
void DriveHandler::processLogs(serializers::WatchdogMessage& message) {
  // Accumulate the logs added (if any)
  for (auto & log: message.addedlogparams()) {
    m_sessionEndContext.pushOrReplace({log.name(), log.value()});
  }
  for (auto & log: message.deletedlogparams()) {
    m_sessionEndContext.erase(log);
  }
}

670
671
672
673
//------------------------------------------------------------------------------
// DriveHandler::processBytes
//------------------------------------------------------------------------------
void DriveHandler::processBytes(serializers::WatchdogMessage& message) {
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
  // In all cases, this is a heartbeat.
  m_lastHeartBeatTime=std::chrono::steady_clock::now();

  // Record data moved totals if needed.
  if (m_totalTapeBytesMoved != message.totaltapebytesmoved()||
      m_totalDiskBytesMoved != message.totaldiskbytesmoved()) {
    if (message.totaltapebytesmoved()<m_totalTapeBytesMoved||
        message.totaldiskbytesmoved()<m_totalDiskBytesMoved) {
      log::ScopedParamContainer params(m_processManager.logContext());
      params.add("PreviousTapeBytesMoved", m_totalTapeBytesMoved)
            .add("PreviousDiskBytesMoved", m_totalDiskBytesMoved)
            .add("NewTapeBytesMoved", message.totaltapebytesmoved())
            .add("NewDiskBytesMoved", message.totaldiskbytesmoved());
      m_processManager.logContext().log(log::WARNING, "In DriveHandler::processRunning(): total bytes moved going backwards");
    }
    m_totalTapeBytesMoved=message.totaltapebytesmoved();
    m_totalDiskBytesMoved=message.totaldiskbytesmoved();
    m_lastDataMovementTime=std::chrono::steady_clock::now();
  }
693
694
695
  
  // Update next timeout if required. Next operations might not do it.
  m_processingStatus.nextTimeout=nextTimeout();
696
697
}

698
699
700
//------------------------------------------------------------------------------
// DriveHandler::processSigChild
//------------------------------------------------------------------------------
701
702
703
704
705
SubprocessHandler::ProcessingStatus DriveHandler::processSigChild() {
  // Check out child process's status. If the child process is still around,
  // waitpid will return 0. Non zero if process completed (and status needs to 
  // be picked up) and -1 if the process is entirely gone.
  // Of course we might not have a child process to begin with.
706
  log::ScopedParamContainer params(m_processManager.logContext());
707
  params.add("tapeDrive", m_configLine.unitName);
708
709
710
  if (-1 == m_pid) return m_processingStatus;
  int processStatus;
  int rc=::waitpid(m_pid, &processStatus, WNOHANG);
711
712
713
  // Check there was no error.
  try {
    exception::Errnum::throwOnMinusOne(rc);
714
  } catch (exception::Exception &ex) {
715
716
    cta::log::ScopedParamContainer params(m_processManager.logContext());
    params.add("pid", m_pid)
717
          .add("tapeDrive", m_configLine.unitName)
718
719
720
721
722
723
724
          .add("Message", ex.getMessageValue())
          .add("SessionState", session::toString(m_sessionState))
          .add("SessionType", toString(m_sessionType));
    m_processManager.logContext().log(log::WARNING,
        "In DriveHandler::processSigChild(): failed to get child process exit code. Doing nothing as we are unable to determine if it is still running or not.");
    return m_processingStatus;
  }
725
  if (rc) {
726
727
728
    // It was our process. In all cases we prepare the space for the new session
    // We did collect the exit code of our child process
    // How well did it finish? (exit() or killed?)
729
730
731
732
733
    // The socket pair will be reopened on the next fork. Clean it up.
    if (m_socketPair.get()) {
      m_processManager.removeFile(m_socketPair->getFdForAccess(server::SocketPair::Side::child));
      m_socketPair.reset(nullptr);
    }
734
    params.add("pid", m_pid);
735
736
737
738
    if (WIFEXITED(processStatus)) {
      // Child process exited properly. The new child process will not need to start
      // a cleanup session.
      params.add("exitCode", WEXITSTATUS(processStatus));
739
740
      // If we are shutting down, we should not request a new session.
      if (m_sessionState != SessionState::Shutdown) {
741
        m_processManager.logContext().log(log::INFO, "Drive subprocess exited. Will spawn a new one.");
742
743
744
        resetToDefault(PreviousSession::Up);
        m_processingStatus.forkRequested=true;
      } else {
745
        m_processManager.logContext().log(log::INFO, "Drive subprocess exited. Will not spawn new one as we are shutting down.");
746
747
        m_processingStatus.forkRequested=false;
      }
748
749
750
751
    } else {
      params.add("IfSignaled", WIFSIGNALED(processStatus))
            .add("TermSignal", WTERMSIG(processStatus))
            .add("CoreDump", WCOREDUMP(processStatus));
752
753
754
755
      // Record the status of the session to decide whether we will run a cleaner on the next one.
      m_previousState = m_sessionState;
      m_previousType = m_sessionType;
      m_previousVid = m_sessionVid;
756
      resetToDefault(PreviousSession::Crashed);
757
758
759
760
761
762
763
764
765
766
767
      // If we are shutting down, we should not request a new session.
      if (m_sessionState != SessionState::Shutdown) {
        m_processManager.logContext().log(log::INFO, "Drive subprocess crashed. Will spawn a new one.");
        m_processingStatus.forkRequested=true;
      } else {
        m_processManager.logContext().log(log::INFO, "Drive subprocess crashed. Will not spawn new one as we are shutting down.");
        m_processingStatus.forkRequested=false;
      }
      m_sessionEndContext.pushOrReplace({"Error_sessionKilled", "1"});
      m_sessionEndContext.pushOrReplace({"killSignal", WTERMSIG(processStatus)});
      m_sessionEndContext.pushOrReplace({"status", "failure"});
768
    }
769
    // In all cases we log the end of the session.
770
    m_sessionEndContext.pushOrReplace({"tapeDrive",m_configLine.unitName});
771
    m_sessionEndContext.moveToTheEndIfPresent("status");
772
773
    m_sessionEndContext.log(cta::log::INFO, "Tape session finished");
    m_sessionEndContext.clear();
774
775
    // And record we do not have a process anymore.
    m_pid = -1;
776
  }
777
  return m_processingStatus;
778
779
}

780
781
782
//------------------------------------------------------------------------------
// DriveHandler::processTimeout
//------------------------------------------------------------------------------
783
SubprocessHandler::ProcessingStatus DriveHandler::processTimeout() {
784
785
  // Process manager found that we timed out. Let's log why and kill the child process,
  // if any (there should be one).
786
  log::ScopedParamContainer params(m_processManager.logContext());
787
  params.add("tapeDrive", m_configLine.unitName);
788
789
790
791
792
  if (-1 == m_pid) {
    m_processManager.logContext().log(log::ERR, "In DriveHandler::processTimeout(): Received timeout without child process present.");
    m_processManager.logContext().log(log::INFO, "Re-launching child process.");
    m_processingStatus.forkRequested=true;
    m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
793
794
795
796
    // Record the status of the session for cleanup startup (not needed here)
    m_previousState = SessionState::Shutdown;
    m_previousType = SessionType::Undetermined;
    m_previousVid = "";
797
798
799
800
    resetToDefault(PreviousSession::Crashed);
    return m_processingStatus;
  }
  auto now = std::chrono::steady_clock::now();
801
802
803
804
805
  params.add("SessionState", session::toString(m_sessionState))
        .add("SesssionType", session::toString(m_sessionType))
        .add("TimeoutType", m_timeoutType)
        .add("SessionTypeWhenTimeoutDecided", session::toString(m_sessionTypeWhenTimeoutDecided))
        .add("SessionStateWhenTimeoutDecided", session::toString(m_sessionStateWhenTimeoutDecided))
Eric Cano's avatar
Eric Cano committed
806
        .add("LastDataMovementTime", std::chrono::duration_cast<std::chrono::seconds>(m_lastDataMovementTime.time_since_epoch()).count())
807
        .add("LastHeartbeatTime", std::chrono::duration_cast<std::chrono::seconds>(m_lastHeartBeatTime.time_since_epoch()).count())
Eric Cano's avatar
Eric Cano committed
808
        .add("LastStateChangeTime", std::chrono::duration_cast<std::chrono::seconds>(m_lastStateChangeTime.time_since_epoch()).count())
809
810
        .add("Now", std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count())
        .add("ThisTimeout", std::chrono::duration_cast<std::chrono::seconds>(m_processingStatus.nextTimeout.time_since_epoch()).count());
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
  // Log timeouts (if we have any)
  try {
    decltype (SubprocessHandler::ProcessingStatus::nextTimeout) nextTimeout = 
        m_lastStateChangeTime + m_stateChangeTimeouts.at(m_sessionState);
    std::chrono::duration<double> timeToTimeout = nextTimeout - now;
    params.add("BeforeStateChangeTimeout_s", timeToTimeout.count());
  } catch (...) {}
  try {
    decltype (SubprocessHandler::ProcessingStatus::nextTimeout) nextTimeout = 
        m_lastDataMovementTime + m_dataMovementTimeouts.at(m_sessionState);
    std::chrono::duration<double> timeToTimeout = nextTimeout - now;
    params.add("BeforeDataMovementTimeout_s", timeToTimeout.count());
  } catch (...) {}
  try {
    decltype (SubprocessHandler::ProcessingStatus::nextTimeout) nextTimeout = 
826
        m_lastHeartBeatTime + m_heartbeatTimeouts.at(m_sessionState);
827
828
829
830
831
    std::chrono::duration<double> timeToTimeout = nextTimeout - now;
    params.add("BeforeHeartbeatTimeout_s", timeToTimeout.count());
  } catch (...) {}
  try {
    params.add("SubprocessId", m_pid);
832
    exception::Errnum::throwOnMinusOne(::kill(m_pid, SIGKILL));
833
    m_processManager.logContext().log(log::WARNING, "In DriveHandler::processTimeout(): Killed subprocess.");
834
835
  } catch (exception::Exception & ex) {
    params.add("Error", ex.getMessageValue());
836
    m_processManager.logContext().log(log::ERR, "In DriveHandler::processTimeout(): Failed to kill subprocess.");
837
838
839
840
  }
  // We now should receive the sigchild, so we ask nothing from process manager
  m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
  return m_processingStatus;
841
842
}

843
844
845
//------------------------------------------------------------------------------
// DriveHandler::runChild
//------------------------------------------------------------------------------
846
int DriveHandler::runChild() {
847
848
  // We are in the child process. It is time to open connections to the catalogue
  // and object store, and run the session.
849
  // If the previous session crashed, and a tape is potentially in the drive (after
850
  // report of starting to mount and before a report of successful unmount),
851
  // this session instance will be a cleaner session.
852
853
854
  // This condition is detected with a non empty m_previousVID and the corresponding
  // m_previousState and m_PreviousType.
  // Finally, on a crashed cleaner session, we will put the drive down.
855
856
857
858
  // A non crashed session which failed to unmount the tape will have put itself
  // down (and already failed to unmount on its own).
  // Otherwise, this session will run a regular data transfer session which will
  // schedule itself info an empty drive probe, archive, retrieve or label session.
859
  
Eric Cano's avatar
Eric Cano committed
860
861
862
863
864
  // Set the thread name for process ID:
  std::string threadName = "cta-tpd-";
  threadName+=m_configLine.unitName;
  prctl(PR_SET_NAME, threadName.c_str());
  
865
866
867
  // Create the channel to talk back to the parent process.
  cta::tape::daemon::DriveHandlerProxy driveHandlerProxy(*m_socketPair);
  
868
869
  std::string hostname=cta::utils::getShortHostname();
  
870
871
872
  auto &lc=m_processManager.logContext();
  {
    log::ScopedParamContainer params(lc);
873
    params.add("backendPath", m_tapedConfig.backendPath.value());
874
875
    lc.log(log::DEBUG, "In DriveHandler::runChild(): will connect to object store backend.");
  }
876
  // Before anything, we need to check we have access to the scheduler's central storages.
877
878
879
880
881
882
883
884
885
886
887
  std::unique_ptr<cta::objectstore::Backend> backend;
  try {
    backend.reset(cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), lc.logger()).release());
  } catch (cta::exception::Exception &ex) {
    log::ScopedParamContainer param (lc);
    param.add("errorMessage", ex.getMessageValue());
    lc.log(log::CRIT, "In DriveHandler::runChild(): failed to connect to objectstore. Reporting fatal error.");
    driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
    sleep(1);
    return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
  }
888
889
890
891
892
893
894
895
896
897
  // If the backend is a VFS, make sure we don't delete it on exit.
  // If not, nevermind.
  try {
    dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
  } catch (std::bad_cast &){}
  // Create the agent entry in the object store. This could fail (even before ping, so
  // handle failure like a ping failure).
  std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator;
  std::unique_ptr<cta::OStoreDBWithAgent> osdb;
  try {
898
899
    std::string processName="DriveProcess-";
    processName+=m_configLine.unitName;
900
901
    log::ScopedParamContainer params(lc);
    params.add("processName", processName);
902
    lc.log(log::DEBUG, "In DriveHandler::runChild(): will create agent entry. Enabling leaving non-empty agent behind.");
903
    backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, processName, lc));
904
    backendPopulator->leaveNonEmptyAgentsBehind();
905
  } catch(cta::exception::Exception &ex) {
906
    log::ScopedParamContainer param(lc);
907
    param.add("errorMessage", ex.getMessageValue());
908
    lc.log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate agent entry. Reporting fatal error.");
909
910
911
912
913
    driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
    sleep(1);
    return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
  }
  try {
914
915
916
    if(!m_catalogue)
      m_catalogue = createCatalogue("DriveHandler::runChild()");
    osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *m_catalogue, lc.logger()));
917
  } catch(cta::exception::Exception &ex) {
918
    log::ScopedParamContainer param(lc);
919
    param.add("errorMessage", ex.getMessageValue());
920
    lc.log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate catalogue. Reporting fatal error.");
921
922
923
924
    driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
    sleep(1);
    return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
  }
925
  lc.log(log::DEBUG, "In DriveHandler::runChild(): will create scheduler.");
926
  cta::Scheduler scheduler(*m_catalogue, *osdb, m_tapedConfig.mountCriteria.value().maxFiles,
927
      m_tapedConfig.mountCriteria.value().maxBytes);
928
  // Before launching the transfer session, we validate that the scheduler is reachable.
929
  lc.log(log::DEBUG, "In DriveHandler::runChild(): will ping scheduler.");
930
  try {
931
    scheduler.ping(lc);
932
933
934
935
936
937
  } catch (const cta::catalogue::WrongSchemaVersionException &ex) {
    log::ScopedParamContainer param (lc);
    param.add("errorMessage", ex.getMessageValue());
    lc.log(log::CRIT, "In DriveHandler::runChild(): catalogue MAJOR version mismatch. Reporting fatal error.");
    driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
    return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
938
  } catch (cta::exception::Exception &ex) {
939
    log::ScopedParamContainer param (lc);
940
    param.add("errorMessage", ex.getMessageValue());
941
    lc.log(log::CRIT, "In DriveHandler::runChild(): failed to ping central storage before session. Reporting fatal error.");
942
943
944
    driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
    return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
  }
945
  
946
  lc.log(log::DEBUG, "In DriveHandler::runChild(): will start agent heartbeat.");
947
  // The object store is accessible, let's turn the agent heartbeat on.
948
  objectstore::AgentHeartbeatThread agentHeartbeat(backendPopulator->getAgentReference(), *backend, lc.logger());
949
  agentHeartbeat.startThread();
950
  
951
952
  // 1) Special case first, if we crashed in a cleaner session, we put the drive down
  if (m_previousSession == PreviousSession::Crashed && m_previousType == SessionType::Cleanup) {
953
    log::ScopedParamContainer params(lc);
954
    params.add("tapeDrive", m_configLine.unitName);
955
956
957
    int logLevel = log::ERR;
    std::string errorMsg = "In DriveHandler::runChild(): the cleaner session crashed. Putting the drive down.";
    lc.log(log::ERR, errorMsg);
958
959
960
961
    // Get hold of the scheduler.
    try {
      cta::common::dataStructures::DriveInfo driveInfo;
      driveInfo.driveName=m_configLine.unitName;
962
963
      driveInfo.logicalLibrary=m_configLine.logicalLibrary;
      driveInfo.host=hostname;
964
      scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc);
965
      cta::common::dataStructures::SecurityIdentity securityIdentity;
966
967
968
      cta::common::dataStructures::DesiredDriveState driveState;
      driveState.up = false;
      driveState.forceDown = false;
969
      driveState.setReasonFromLogMsg(logLevel,errorMsg);
970
      scheduler.setDesiredDriveState(securityIdentity, m_configLine.unitName,driveState, lc);
971
972
      return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
    } catch (cta::exception::Exception &ex) {
973
      log::ScopedParamContainer param(lc);
974
      param.add("errorMessage", ex.getMessageValue());
975
      lc.log(log::CRIT, "In DriveHandler::runChild(): failed to set the drive down. Reporting fatal error.");
976
977
978
979
980
981
      driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
      sleep(1);
      return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
    }
  }

982
  // 2) If the previous session crashed, we might want to run a cleaner session, depending
983
984
985
986
987
  // on the previous state
  std::set<SessionState> statesRequiringCleaner = { SessionState::Mounting, 
    SessionState::Running, SessionState::Unmounting };
  if (m_previousSession == PreviousSession::Crashed && statesRequiringCleaner.count(m_previousState)) {
    if (!m_previousVid.size()) {
988
989
990
      int logLevel = log::ERR;
      std::string errorMsg = "In DriveHandler::runChild(): Should run cleaner but VID is missing. Putting the drive down.";
      lc.log(log::ERR, errorMsg);
991
992
993
      try {
        cta::common::dataStructures::DriveInfo driveInfo;
        driveInfo.driveName=m_configLine.unitName;
994
995
        driveInfo.logicalLibrary=m_configLine.logicalLibrary;
        driveInfo.host=hostname;
996
        scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc);
997
        cta::common::dataStructures::SecurityIdentity securityIdentity;
998
999
1000
        cta::common::dataStructures::DesiredDriveState driveState;
        driveState.up = false;
        driveState.forceDown = false;
For faster browsing, not all history is shown. View entire blame