MaintenanceHandler.cpp 17.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "MaintenanceHandler.hpp"
20
21
22
23
24
25
26
27
#include "common/exception/Errnum.hpp"
#include "objectstore/AgentHeartbeatThread.hpp"
#include "objectstore/BackendPopulator.hpp"
#include "objectstore/BackendFactory.hpp"
#include "objectstore/BackendVFS.hpp"
#include "objectstore/GarbageCollector.hpp"
#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp"
#include "catalogue/Catalogue.hpp"
28
#include "catalogue/CatalogueFactoryFactory.hpp"
29
30
31
#include "scheduler/Scheduler.hpp"
#include "rdbms/Login.hpp"
#include "common/make_unique.hpp"
32
#include "scheduler/DiskReportRunner.hpp"
33
#include "scheduler/RepackRequestManager.hpp"
34
35
36

#include <signal.h>
#include <sys/wait.h>
Eric Cano's avatar
Eric Cano committed
37
#include <sys/prctl.h>
38
39
40
41
42
43

namespace cta { namespace tape { namespace  daemon {

//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
44
MaintenanceHandler::MaintenanceHandler(const TapedConfiguration& tapedConfig, ProcessManager& pm):
45
46
47
48
49
50
51
52
SubprocessHandler("garbageCollector"), m_processManager(pm), m_tapedConfig(tapedConfig) {
  // As the handler is started, its first duty is to create a new subprocess. This
  // will be managed by the process manager (initial request in getInitialStatus)
}

//------------------------------------------------------------------------------
// GarbageCollectorHandler::getInitialStatus
//------------------------------------------------------------------------------
53
SubprocessHandler::ProcessingStatus MaintenanceHandler::getInitialStatus() {
54
55
56
57
58
59
60
  m_processingStatus.forkRequested=true;
  return m_processingStatus;
}

//------------------------------------------------------------------------------
// GarbageCollectorHandler::getInitialStatus
//------------------------------------------------------------------------------
61
void MaintenanceHandler::postForkCleanup() {
62
63
64
65
66
67
68
69
70
  // We are in the child process of another handler. We can close our socket pair
  // without re-registering it from poll.
  m_socketPair.reset(nullptr);
}


//------------------------------------------------------------------------------
// GarbageCollectorHandler::fork
//------------------------------------------------------------------------------
71
SubprocessHandler::ProcessingStatus MaintenanceHandler::fork() {
72
73
74
75
76
77
78
79
80
81
  // If anything fails while attempting to fork, we will have to declare ourselves
  // failed and ask for a shutdown by sending the TERM signal to the parent process
  // This will ensure the shutdown-kill sequence managed by the signal handler without code duplication.
  // Record we no longer ask for fork
  m_processingStatus.forkRequested = false;
  try {
    // First prepare a socket pair for this new subprocess
    m_socketPair.reset(new cta::server::SocketPair());
    // and fork
    m_pid=::fork();
82
    exception::Errnum::throwOnMinusOne(m_pid, "In MaintenanceHandler::fork(): failed to fork()");
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    if (!m_pid) {
      // We are in the child process
      SubprocessHandler::ProcessingStatus ret;
      ret.forkState = SubprocessHandler::ForkState::child;
      return ret;
    } else {
      // We are in the parent process
      m_processingStatus.forkState = SubprocessHandler::ForkState::parent;
      // Close child side of socket.
      m_socketPair->close(server::SocketPair::Side::child);
      // We are now ready to react to timeouts and messages from the child process.
      return m_processingStatus;
    }
  } catch (cta::exception::Exception & ex) {
    cta::log::ScopedParamContainer params(m_processManager.logContext());
98
    m_processManager.logContext().log(log::ERR, "Failed to fork maintenance process. Initiating shutdown with SIGTERM.");
99
100
101
102
103
104
105
106
107
108
109
110
111
    // Wipe all previous states as we are shutting down
    m_processingStatus = SubprocessHandler::ProcessingStatus();
    m_processingStatus.shutdownComplete=true;
    m_processingStatus.forkState=SubprocessHandler::ForkState::parent;
    // Initiate shutdown
    ::kill(::getpid(), SIGTERM);
    return m_processingStatus;
  }
}

//------------------------------------------------------------------------------
// GarbageCollectorHandler::kill
//------------------------------------------------------------------------------
112
void MaintenanceHandler::kill() {
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
  // If we have a subprocess, kill it and wait for completion (if needed). We do not need to keep
  // track of the exit state as kill() means we will not be called anymore.
  log::ScopedParamContainer params(m_processManager.logContext());
  if (m_pid != -1) {
    params.add("SubProcessId", m_pid);
    // The socket pair will be reopened on the next fork. Clean it up.
    m_socketPair.reset(nullptr);
    try {
      exception::Errnum::throwOnMinusOne(::kill(m_pid, SIGKILL),"Failed to kill() subprocess");
      int status;
      // wait for child process exit
      exception::Errnum::throwOnMinusOne(::waitpid(m_pid, &status, 0), "Failed to waitpid() subprocess");
      // Log the status
      params.add("WIFEXITED", WIFEXITED(status));
      if (WIFEXITED(status)) {
        params.add("WEXITSTATUS", WEXITSTATUS(status));
      } else {
        params.add("WIFSIGNALED", WIFSIGNALED(status));
      } 
132
      m_processManager.logContext().log(log::INFO, "In MaintenanceHandler::kill(): sub process completed");
133
134
    } catch (exception::Exception & ex) {
      params.add("Exception", ex.getMessageValue());
135
      m_processManager.logContext().log(log::ERR, "In MaintenanceHandler::kill(): failed to kill existing subprocess");
136
137
    }
  } else {
138
    m_processManager.logContext().log(log::INFO, "In MaintenanceHandler::kill(): no subprocess to kill");
139
140
141
142
143
144
  }
}

//------------------------------------------------------------------------------
// GarbageCollectorHandler::processEvent
//------------------------------------------------------------------------------
145
SubprocessHandler::ProcessingStatus MaintenanceHandler::processEvent() {
146
  // We do not expect any feedback for the child process...
147
  m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::processEvent(): spurious event");
148
149
150
151
152
153
  return m_processingStatus;
}

//------------------------------------------------------------------------------
// GarbageCollectorHandler::processSigChild
//------------------------------------------------------------------------------
154
SubprocessHandler::ProcessingStatus MaintenanceHandler::processSigChild() {
155
156
157
158
159
160
161
162
163
164
165
  // Check out child process's status. If the child process is still around,
  // waitpid will return 0. Non zero if process completed (and status needs to 
  // be picked up) and -1 if the process is entirely gone.
  // Of course we might not have a child process to begin with.
  log::ScopedParamContainer params(m_processManager.logContext());
  if (-1 == m_pid) return m_processingStatus;
  int processStatus;
  int rc=::waitpid(m_pid, &processStatus, WNOHANG);
  // Check there was no error.
  try {
    exception::Errnum::throwOnMinusOne(rc);
166
  } catch (exception::Exception &ex) {
167
168
169
170
    cta::log::ScopedParamContainer params(m_processManager.logContext());
    params.add("pid", m_pid)
          .add("Message", ex.getMessageValue());
    m_processManager.logContext().log(log::WARNING,
171
        "In MaintenanceHandler::processSigChild(): failed to get child process exit code. Doing nothing as we are unable to determine if it is still running or not.");
172
173
174
175
176
177
178
179
180
181
182
183
184
185
    return m_processingStatus;
  }
  if (rc) {
    // It was our process. In all cases we prepare the space for the new session
    // We did collect the exit code of our child process
    // How well did it finish? (exit() or killed?)
    // The socket pair will be reopened on the next fork. Clean it up.
    m_socketPair.reset(nullptr);
    params.add("pid", m_pid);
    if (WIFEXITED(processStatus)) {
      // Child process exited properly. The new child process will not need to start
      // a cleanup session.
      params.add("exitCode", WEXITSTATUS(processStatus));
      // If we are shutting down, we should not request a new session.
Eric Cano's avatar
Eric Cano committed
186
      if (!m_shutdownInProgress) {
187
        m_processManager.logContext().log(log::INFO, "Maintenance subprocess exited. Will spawn a new one.");
188
        m_processingStatus.forkRequested=true;
Eric Cano's avatar
Eric Cano committed
189
        m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
190
      } else {
191
        m_processManager.logContext().log(log::INFO, "Maintenance subprocess exited. Will not spawn new one as we are shutting down.");
192
193
194
195
196
197
198
199
200
        m_processingStatus.forkRequested=false;
        m_processingStatus.shutdownComplete=true;
        m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
      }
    } else {
      params.add("IfSignaled", WIFSIGNALED(processStatus))
            .add("TermSignal", WTERMSIG(processStatus))
            .add("CoreDump", WCOREDUMP(processStatus));
      // If we are shutting down, we should not request a new session.
Eric Cano's avatar
Eric Cano committed
201
      if (!m_shutdownInProgress) {
202
        m_processManager.logContext().log(log::INFO, "Maintenance subprocess crashed. Will spawn a new one.");
203
        m_processingStatus.forkRequested=true;
Eric Cano's avatar
Eric Cano committed
204
        m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
205
      } else {
206
        m_processManager.logContext().log(log::INFO, "Maintenance subprocess crashed. Will not spawn new one as we are shutting down.");
207
208
209
210
211
212
213
214
215
216
217
218
219
220
        m_processingStatus.forkRequested=false;
        m_processingStatus.shutdownComplete=true;
        m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
      }
    }
    // In all cases, we do not have a PID anymore
    m_pid=-1;
  }
  return m_processingStatus;
}

//------------------------------------------------------------------------------
// GarbageCollectorHandler::processTimeout
//------------------------------------------------------------------------------
221
SubprocessHandler::ProcessingStatus MaintenanceHandler::processTimeout() {
222
223
  // The only time we expect a timeout is when shutting down
  if (!m_shutdownInProgress) {
224
    m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::processTimeout(): spurious timeout: no shutdown");
225
226
227
228
229
    return m_processingStatus;
  } 
  // We were shutting down and the child process did not exit in time. Killing it.
  if (-1!=m_pid) {
    m_processManager.logContext().log(log::WARNING, 
230
        "In MaintenanceHandler::processTimeout(): spurious timeout: no more process");
231
232
  } else {
    // We will help the exit of the child process by killing it.
233
    m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::processTimeout(): will kill subprocess");
234
235
236
237
238
239
240
241
242
243
244
    kill();
  }
  // In all cases, the shutdown is complete.
  m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
  m_processingStatus.shutdownComplete=true;
  return m_processingStatus;
}

//------------------------------------------------------------------------------
// GarbageCollectorHandler::runChild
//------------------------------------------------------------------------------
245
int MaintenanceHandler::runChild() {
246
247
248
249
250
  // We are in the child process. It is time to open connections to the catalogue
  // and object store, and run the garbage collector.
  // We do not have to care for previous crashed sessions as we will garbage
  // collect them like any other crashed agent.
  
Eric Cano's avatar
Eric Cano committed
251
  // Set the thread name for process ID:
Eric Cano's avatar
Eric Cano committed
252
  prctl(PR_SET_NAME, "cta-tpd-maint");
Eric Cano's avatar
Eric Cano committed
253
  
254
255
256
257
258
  // Before anything, we will check for access to the scheduler's central storage.
  // If we fail to access it, we cannot work. We expect the drive processes to
  // fail likewise, so we just wait for shutdown signal (no feedback to main
  // process).
  std::unique_ptr<cta::objectstore::Backend> backend(
259
    cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), m_processManager.logContext().logger()).release());
260
261
262
263
264
265
266
267
268
269
270
271
  // If the backend is a VFS, make sure we don't delete it on exit.
  // If not, nevermind.
  try {
    dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
  } catch (std::bad_cast &){}
  // Create the agent entry in the object store. This could fail (even before ping, so
  // handle failure like a ping failure).
  std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator;
  std::unique_ptr<cta::OStoreDBWithAgent> osdb;
  std::unique_ptr<cta::catalogue::Catalogue> catalogue;
  std::unique_ptr<cta::Scheduler> scheduler;
  try {
272
    backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, "Maintenance", m_processManager.logContext()));
273
    osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *catalogue, m_processManager.logContext().logger()));
274
275
    const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value());
    const uint64_t nbConns = 1;
276
    const uint64_t nbArchiveFileListingConns = 0;
277
278
279
    auto catalogueFactory = cta::catalogue::CatalogueFactoryFactory::create(m_processManager.logContext().logger(),
      catalogueLogin, nbConns, nbArchiveFileListingConns);
    catalogue=catalogueFactory->create();
280
281
    scheduler=make_unique<cta::Scheduler>(*catalogue, *osdb, 5, 2*1000*1000); //TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them
  // Before launching the transfer session, we validate that the scheduler is reachable.
282
    scheduler->ping(m_processManager.logContext());
283
284
285
286
  } catch(cta::exception::Exception &ex) {
    {
      log::ScopedParamContainer param(m_processManager.logContext());
      param.add("errorMessage", ex.getMessageValue());
Eric Cano's avatar
Eric Cano committed
287
      m_processManager.logContext().log(log::CRIT, 
288
          "In MaintenanceHandler::runChild(): contact central storage. Waiting for shutdown.");
289
290
291
292
293
    }
    server::SocketPair::pollMap pollList;
    pollList["0"]=m_socketPair.get();
    // Wait forever (negative timeout) for something to come from parent process.
    server::SocketPair::poll(pollList, -1, server::SocketPair::Side::parent);
Eric Cano's avatar
Eric Cano committed
294
    m_processManager.logContext().log(log::INFO,
295
        "In MaintenanceHandler::runChild(): Received shutdown message after failure to contact storage. Exiting.");
296
297
298
299
    return EXIT_FAILURE; 
  }
  
  // The object store is accessible, let's turn the agent heartbeat on.
300
  objectstore::AgentHeartbeatThread agentHeartbeat(backendPopulator->getAgentReference(), *backend, m_processManager.logContext().logger());
301
302
  agentHeartbeat.startThread();
  
303
  // Create the garbage collector and the disk reporter
304
  objectstore::GarbageCollector gc(*backend, backendPopulator->getAgentReference(), *catalogue);
305
  DiskReportRunner diskReportRunner(*scheduler);
306
  RepackRequestManager repackRequestManager(*scheduler);
307
  
308
  // Run the maintenance in a loop: garbage collector and disk reporter
Eric Cano's avatar
Eric Cano committed
309
310
311
312
313
  try {
    server::SocketPair::pollMap pollList;
    pollList["0"]=m_socketPair.get();
    bool receivedMessage=false;
    do {
314
      utils::Timer t;
315
      m_processManager.logContext().log(log::DEBUG, 
316
          "In MaintenanceHandler::runChild(): About to run a GC pass.");
317
      gc.runOnePass(m_processManager.logContext());
318
      diskReportRunner.runOnePass(m_processManager.logContext());
319
      repackRequestManager.runOnePass(m_processManager.logContext());
Eric Cano's avatar
Eric Cano committed
320
      try {
321
        server::SocketPair::poll(pollList, s_pollInterval - t.secs(), server::SocketPair::Side::parent);
Eric Cano's avatar
Eric Cano committed
322
323
324
325
        receivedMessage=true;
      } catch (server::SocketPair::Timeout & ex) {}
    } while (!receivedMessage);
    m_processManager.logContext().log(log::INFO,
326
        "In MaintenanceHandler::runChild(): Received shutdown message. Exiting.");
Eric Cano's avatar
Eric Cano committed
327
328
329
330
331
  } catch (cta::exception::Exception & ex) {
    {
      log::ScopedParamContainer params(m_processManager.logContext());
      params.add("Message", ex.getMessageValue());
      m_processManager.logContext().log(log::ERR, 
332
          "In MaintenanceHandler::runChild(): received an exception. Backtrace follows.");
Eric Cano's avatar
Eric Cano committed
333
334
335
    }
    m_processManager.logContext().logBacktrace(log::ERR, ex.backtrace());
  }
336
337
338
339
340
  agentHeartbeat.stopAndWaitThread();
  return EXIT_SUCCESS;
}

//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
341
// GarbageCollectorHandler::shutdown
342
//------------------------------------------------------------------------------
343
SubprocessHandler::ProcessingStatus MaintenanceHandler::shutdown() {
344
345
346
347
  // We will signal the shutdown to the child process by sending a byte over the 
  // socket pair (if we have one)
  m_shutdownInProgress=true;
  if (!m_socketPair.get()) {
348
    m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::shutdown(): no socket pair");
349
  } else {
350
    m_processManager.logContext().log(log::INFO, "In MaintenanceHandler::shutdown(): sent shutdown message to child process");
351
352
353
354
355
    m_socketPair->send("\0");
  }
  return m_processingStatus;
}

356
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
357
// GarbageCollectorHandler::~GarbageCollectorHandler
358
//------------------------------------------------------------------------------
359
MaintenanceHandler::~MaintenanceHandler() {
360
361
362
363
364
  // If we still have a child process (should not), just stop it the hard way.
  if (-1 != m_pid) {
    cta::log::ScopedParamContainer params(m_processManager.logContext());
    params.add("pid", m_pid);
    ::kill(m_pid, SIGKILL);
365
    m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::~GarbageCollectorHandler(): killed leftover subprocess");
366
367
  }
}
368

369
}}} // namespace cta::tape::daemon