CatalogueTransferSession.cpp 16.7 KB
Newer Older
Daniele Kruse's avatar
Daniele Kruse committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
21
 * @author Castor Dev team, castor-dev@cern.ch
Daniele Kruse's avatar
Daniele Kruse committed
22
23
 *****************************************************************************/

24
25
#include "castor/common/CastorConfiguration.hpp"
#include "castor/exception/Exception.hpp"
26
27
#include "castor/legacymsg/CupvProxy.hpp"
#include "castor/legacymsg/VmgrProxy.hpp"
28
#include "castor/tape/tapeserver/daemon/CatalogueTransferSession.hpp"
29
#include "h/Ctape_constants.h"
30
#include "h/Cupv_constants.h"
31
#include "h/rmc_constants.h"
32
#include "h/vmgr_constants.h"
Daniele Kruse's avatar
Daniele Kruse committed
33

34
35
36
#include <sys/types.h>
#include <signal.h>

37
38
39
//------------------------------------------------------------------------------
// create
//------------------------------------------------------------------------------
40
41
castor::tape::tapeserver::daemon::CatalogueTransferSession*
  castor::tape::tapeserver::daemon::CatalogueTransferSession::create(
42
43
44
    log::Logger &log,
    const int netTimeout,
    const tape::utils::DriveConfig &driveConfig,
45
46
47
    const legacymsg::RtcpJobRqstMsgBody &vdqmJob,
    legacymsg::VmgrProxy &vmgr,
    legacymsg::CupvProxy &cupv,
48
    const std::string &hostName,
49
    const time_t blockMoveTimeoutInSecs,
50
51
    const unsigned short rmcPort,
    ProcessForkerProxy &processForker) {
52

53
  const pid_t pid = processForker.forkDataTransfer(driveConfig, vdqmJob);
54

55
  return new CatalogueTransferSession(
56
    log,
57
58
    netTimeout,
    pid,
59
    driveConfig,
60
61
    vdqmJob,
    vmgr,
62
    cupv,
63
64
    hostName,
    blockMoveTimeoutInSecs);
65
66
}

Daniele Kruse's avatar
Daniele Kruse committed
67
68
69
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
70
71
castor::tape::tapeserver::daemon::CatalogueTransferSession::
  CatalogueTransferSession(
72
  log::Logger &log,
73
74
  const int netTimeout,
  const pid_t pid,
75
  const tape::utils::DriveConfig &driveConfig,
76
77
  const legacymsg::RtcpJobRqstMsgBody &vdqmJob,
  legacymsg::VmgrProxy &vmgr,
78
  legacymsg::CupvProxy &cupv,
79
80
  const std::string &hostName,
  const time_t blockMoveTimeoutInSecs) throw():
81
  CatalogueSession(log, netTimeout, pid, driveConfig),
82
  m_state(TRANSFERSTATE_WAIT_JOB),
83
  m_mode(WRITE_DISABLE),
84
  m_lastTimeSomeBlocksWereMoved(time(0)),
85
  m_assignmentTime(time(0)),
86
87
  m_vdqmJob(vdqmJob),
  m_vmgr(vmgr),
88
  m_cupv(cupv),
89
90
  m_hostName(hostName),
  m_blockMoveTimeoutInSecs(blockMoveTimeoutInSecs) {
Daniele Kruse's avatar
Daniele Kruse committed
91
92
}

93
94
95
96
//------------------------------------------------------------------------------
// tick
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::tick() {
97
98
99
  const time_t now = time(0);
  const time_t secsSinceSomeBlocksWereMoved = now -
    m_lastTimeSomeBlocksWereMoved;
100
101
  const bool timeOutExceeded = secsSinceSomeBlocksWereMoved >
    m_blockMoveTimeoutInSecs;
102

103
104
105
106
  // Only execute watchdog logic when the tape has been mounted and the
  // session is running, because it is not fair to apply the watchdog logic
  // whilst a tape is being mounted
  if(TRANSFERSTATE_RUNNING == m_state && timeOutExceeded) {
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    std::list<log::Param> params;
    params.push_back(log::Param("transferSessionPid", m_pid));
    params.push_back(log::Param("secsSinceSomeBlocksWereMoved",
      secsSinceSomeBlocksWereMoved));
    params.push_back(log::Param("blockMoveTimeoutInSecs",
      m_blockMoveTimeoutInSecs));
    m_log(LOG_ERR, "Killing data-transfer session because it is stuck", params);

    if(kill(m_pid, SIGKILL)) {
      const std::string errnoStr = castor::utils::errnoToString(errno);
      params.push_back(log::Param("message", errnoStr));
      m_log(LOG_ERR, "Failed to kill data-transfer session", params);
    }
  }
121
122
}

Daniele Kruse's avatar
Daniele Kruse committed
123
//------------------------------------------------------------------------------
124
// sessionSucceeded
Daniele Kruse's avatar
Daniele Kruse committed
125
//------------------------------------------------------------------------------
126
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
127
128
129
130
131
132
  sessionSucceeded() {
}

//------------------------------------------------------------------------------
// sessionFailed
//------------------------------------------------------------------------------
133
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
134
135
136
137
138
139
  sessionFailed() {
}

//------------------------------------------------------------------------------
// getAssignmentTime
//------------------------------------------------------------------------------
140
time_t castor::tape::tapeserver::daemon::CatalogueTransferSession::
141
142
143
144
  getAssignmentTime() const throw() {
  return m_assignmentTime;
}

Daniele Kruse's avatar
Daniele Kruse committed
145
146
147
//------------------------------------------------------------------------------
// getVdqmJob
//------------------------------------------------------------------------------
148
castor::legacymsg::RtcpJobRqstMsgBody castor::tape::tapeserver::daemon::
149
  CatalogueTransferSession::getVdqmJob() const{
Daniele Kruse's avatar
Daniele Kruse committed
150
151
152
  return m_vdqmJob;
}

153
154
155
//-----------------------------------------------------------------------------
// receivedRecallJob
//-----------------------------------------------------------------------------
156
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
157
  receivedRecallJob(const std::string &vid) {
158
159
  const char *const task = "accept reception of recall job";

160
161
  if(TRANSFERSTATE_WAIT_JOB != m_state) {
    castor::exception::Exception ex;
162
    ex.getMessage() << "Failed to " << task <<
163
164
165
166
167
      ": Catalogue transfer-session state-mismatch: "
      "expected=" << transferStateToStr(TRANSFERSTATE_WAIT_JOB) <<
      " actual=" << transferStateToStr(m_state);
    throw ex;
  }
168
169
170

  checkUserCanRecallFromTape(vid);

171
172
173
174
175
176
  m_state = TRANSFERSTATE_WAIT_MOUNTED;

  m_mode = WRITE_DISABLE;
  m_vid = vid;
}

177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
//-----------------------------------------------------------------------------
// checkUserCanRecallFromTape
//-----------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
  checkUserCanRecallFromTape(const std::string &vid) {
  std::list<log::Param> params;

  params.push_back(log::Param("vid", vid));
  params.push_back(log::Param("clientEuid", m_vdqmJob.clientEuid));
  params.push_back(log::Param("clientEgid", m_vdqmJob.clientEgid));

  const legacymsg::VmgrTapeInfoMsgBody vmgrTape = m_vmgr.queryTape(vid);
  params.push_back(log::Param("status",
    castor::utils::tapeStatusToString(vmgrTape.status)));
  params.push_back(log::Param("poolName", vmgrTape.poolName));
  m_log(LOG_INFO, "Queried vmgr for the tape to be recalled", params);

  if(vmgrTape.status & EXPORTED) {
    castor::exception::Exception ex;
    ex.getMessage() << "Cannot recall from an EXPORTED tape: vid=" << vid;
    throw ex;
  }

  if(vmgrTape.status & ARCHIVED) {
    castor::exception::Exception ex;
    ex.getMessage() << "Cannot recall from an ARCHIVED tape: vid=" << vid;
    throw ex;
  }

  // Only tape operators can recall from a DISABLED tape
  if(vmgrTape.status & DISABLED) {
    const bool userIsTapeOperator = m_cupv.isGranted(
      m_vdqmJob.clientEuid,
      m_vdqmJob.clientEgid,
      m_vdqmJob.clientHost,
      m_hostName,
      P_TAPE_OPERATOR);
    params.push_back(log::Param("userIsTapeOperator", userIsTapeOperator ?
      "true" : "false"));
    m_log(LOG_INFO, "Tape is DISABLED, therefore querying cupv to see if user"
      " is a tape operator", params);

    if(!userIsTapeOperator) {
      castor::exception::Exception ex;
      ex.getMessage() << "Only a tape operator can recall from a DISABLED tape"
        ": vid=" << vid;
      throw ex;
    }
  }
}

228
229
230
//-----------------------------------------------------------------------------
// receivedMigrationJob
//-----------------------------------------------------------------------------
231
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
232
233
234
235
236
237
238
239
240
  receivedMigrationJob(const std::string &vid) {
  if(TRANSFERSTATE_WAIT_JOB != m_state) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to accept reception of recall job"
      ": Catalogue transfer-session state-mismatch: "
      "expected=" << transferStateToStr(TRANSFERSTATE_WAIT_JOB) <<
      " actual=" << transferStateToStr(m_state);
    throw ex;
  }
241
242
243

  checkUserCanMigrateToTape(vid);

244
245
246
  m_state = TRANSFERSTATE_WAIT_MOUNTED;

  m_mode = WRITE_ENABLE;
Daniele Kruse's avatar
Daniele Kruse committed
247
248
249
  m_vid = vid;
}

250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
//-----------------------------------------------------------------------------
// checkUserCanMigrateToTape
//-----------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
  checkUserCanMigrateToTape(const std::string &vid) {
  std::list<log::Param> params;

  params.push_back(log::Param("vid", vid));
  params.push_back(log::Param("clientEuid", m_vdqmJob.clientEuid));
  params.push_back(log::Param("clientEgid", m_vdqmJob.clientEgid));

  const legacymsg::VmgrTapeInfoMsgBody vmgrTape = m_vmgr.queryTape(vid);
  params.push_back(log::Param("status",
    castor::utils::tapeStatusToString(vmgrTape.status)));
  params.push_back(log::Param("poolName", vmgrTape.poolName));
  m_log(LOG_INFO, "Queried vmgr for the tape for migration", params);

  if(vmgrTape.status & EXPORTED) {
    castor::exception::Exception ex;
    ex.getMessage() << "Cannot migrate files to an EXPORTED tape"
      ": vid=" << vid;
    throw ex;
  }

  if(vmgrTape.status & ARCHIVED) {
    castor::exception::Exception ex;
    ex.getMessage() << "Cannot migrate files to an ARCHIVED tape"
      ": vid=" << vid;
    throw ex;
  }

  if(vmgrTape.status & DISABLED) {
    castor::exception::Exception ex;
    ex.getMessage() << "Cannot migrate files to a DISABLED tape"
      ": vid=" << vid;
    throw ex;
  }

  const legacymsg::VmgrPoolInfoMsgBody vmgrPool =
    m_vmgr.queryPool(vmgrTape.poolName);
  params.push_back(log::Param("poolUid", vmgrPool.poolUid));
  params.push_back(log::Param("poolGid", vmgrPool.poolGid));
  m_log(LOG_INFO, "Queried vmgr for the pool of the tape for migration",
    params);

295
296
297
298
299
300
301
302
303
304
305
306
307
  // A pool has no owner if either its user or group ID is 0
  //
  // There is no such concept as a pool owned by the user root or the group root
  const bool poolHasOwner = 0 != vmgrPool.poolUid && 0 != vmgrPool.poolGid;

  if(!poolHasOwner) {
    castor::exception::Exception ex;
    ex.getMessage() <<
      "Cannot migrate files to a tape belonging to an owner-less tape-pool"
      ": vid=" << vid;
    throw ex;
  }

308
309
310
311
312
313
314
315
316
317
318
319
  // Only the owner of the pool of a tape can migrate files to that tape
  const bool userIsPoolOwner = m_vdqmJob.clientEuid == vmgrPool.poolUid &&
    m_vdqmJob.clientEgid == vmgrPool.poolGid;
  if(!userIsPoolOwner) {
    castor::exception::Exception ex;
    ex.getMessage() <<
      "Only the owner of the pool of a tape can migrate files to that tape"
      ": vid=" << vid;
    throw ex;
  }
}

Daniele Kruse's avatar
Daniele Kruse committed
320
321
322
//------------------------------------------------------------------------------
// getVid
//------------------------------------------------------------------------------
323
std::string castor::tape::tapeserver::daemon::CatalogueTransferSession::
324
325
326
327
328
329
  getVid() const {
  switch(m_state) {
  case TRANSFERSTATE_WAIT_MOUNTED:
  case TRANSFERSTATE_RUNNING:
    return m_vid;
  default:
330
331
332
333
334
335
336
    {
      castor::exception::Exception ex;
      ex.getMessage() << "Failed to get VID from catalogue transfer-session"
        ": Catalogue transfer-session is in an incompatible state: "
        " state=" << transferStateToStr(m_state);
      throw ex;
    }
337
338
339
340
341
342
  }
}

//------------------------------------------------------------------------------
// getMode
//------------------------------------------------------------------------------
343
int castor::tape::tapeserver::daemon::CatalogueTransferSession::
344
345
346
347
348
349
350
351
352
353
  getMode() const {
  switch(m_state) {
  case TRANSFERSTATE_WAIT_MOUNTED:
  case TRANSFERSTATE_RUNNING:
    return m_mode;
  default:
    {
      castor::exception::Exception ex;
      ex.getMessage() << "Failed to get access mode from catalogue"
        " transfer-session"
354
        ": Catalogue transfer-session is in an incompatible state: "
355
356
357
358
359
360
        " state=" << transferStateToStr(m_state);
      throw ex;
    }
  }
}

361
362
363
//-----------------------------------------------------------------------------
// getPid
//-----------------------------------------------------------------------------
364
pid_t castor::tape::tapeserver::daemon::CatalogueTransferSession::
365
366
367
368
  getPid() const throw() {
  return m_pid;
}

369
370
371
//-----------------------------------------------------------------------------
// tapeMountedForMigration
//-----------------------------------------------------------------------------
372
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
  tapeMountedForMigration(const std::string &vid) {
  if(TRANSFERSTATE_WAIT_MOUNTED != m_state) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to accept tape mounted for migration"
      ": Catalogue transfer-session state-mismatch: "
      "expected=" << transferStateToStr(TRANSFERSTATE_WAIT_MOUNTED) <<
      " actual=" << transferStateToStr(m_state);
    throw ex;
  }

  // If the volume identifier of the data transfer job does not match the
  // mounted tape
  if(m_vid != vid) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to accept tape mounted for migration"
      ": VID mismatch: expected=" << m_vid << " actual=" << vid;
    throw ex;
  }

392
  // If the mount is not for migration
393
394
395
396
397
398
  if(WRITE_ENABLE != m_mode) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to accept tape mounted for migration"
      ": Data transfer job is not for migration";
    throw ex;
  }
399

400
  m_lastTimeSomeBlocksWereMoved = time(0); // Start watchdog timer
401
  m_state = TRANSFERSTATE_RUNNING;
402
403
404
405
406
}

//-----------------------------------------------------------------------------
// tapeMountedForRecall
//-----------------------------------------------------------------------------
407
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
408
  tapeMountedForRecall(const std::string &vid) {
409

410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
  if(TRANSFERSTATE_WAIT_MOUNTED != m_state) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to accept tape mounted for recall"
      ": Catalogue transfer-session state-mismatch: "
      "expected=" << transferStateToStr(TRANSFERSTATE_WAIT_MOUNTED) <<
      " actual=" << transferStateToStr(m_state);
    throw ex;
  }

  // If the volume identifier of the data transfer job does not match the
  // mounted tape
  if(m_vid != vid) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to accept tape mounted for recall"
      ": VID mismatch: expected=" << m_vid << " actual=" << vid;
    throw ex;
  }

428
  // If the mount is not for recall
429
430
431
432
433
434
  if(WRITE_DISABLE != m_mode) {
    castor::exception::Exception ex;
    ex.getMessage() << "Failed to accept tape mounted for recall"
      ": Data transfer job is not for recall";
    throw ex;
  }
435

436
  m_lastTimeSomeBlocksWereMoved = time(0); // Start watchdog timer
437
  m_state = TRANSFERSTATE_RUNNING;
438
439
440
441
442
}

//-----------------------------------------------------------------------------
// transferStateToStr
//-----------------------------------------------------------------------------
443
const char *castor::tape::tapeserver::daemon::CatalogueTransferSession::
444
445
446
447
448
449
450
451
452
453
454
455
  transferStateToStr(const TransferState state) const throw() {
  switch(state) {
  case TRANSFERSTATE_WAIT_JOB    : return "WAIT_JOB";
  case TRANSFERSTATE_WAIT_MOUNTED: return "WAIT_MOUNTED";
  case TRANSFERSTATE_RUNNING     : return "RUNNING";
  default                        : return "UNKNOWN";
  }
}

//-----------------------------------------------------------------------------
// tapeIsBeingMounted
//-----------------------------------------------------------------------------
456
bool castor::tape::tapeserver::daemon::CatalogueTransferSession::
457
458
  tapeIsBeingMounted() const throw() {
  return TRANSFERSTATE_WAIT_MOUNTED == m_state;
Daniele Kruse's avatar
Daniele Kruse committed
459
}
460
461
462
463
464
465

//-----------------------------------------------------------------------------
// receivedHeartbeat
//-----------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
  receivedHeartbeat(const uint64_t nbBlocksMoved) {
466
467
468
  if(nbBlocksMoved > 0) {
    m_lastTimeSomeBlocksWereMoved = time(0);
  }
469
}