EosImportFiles.cpp 16.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*!
 * @project        The CERN Tape Archive (CTA)
 * @brief          Inject file metadata into EOS using gRPC
 * @copyright      Copyright 2019 CERN
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
 *
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
 *
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include <sys/stat.h>
20
21
#include <XrdSsiPbConfig.hpp>
#include <XrdSsiPbLog.hpp>
22
23
#include <rdbms/Login.hpp>
#include "EosImportFiles.hpp"
24
#include "GrpcUtils.hpp"
25
26
27
28
29


namespace cta {
namespace migration {

30
EosImportFiles::EosImportFiles(const std::string &configfile, bool retry) :
31
  m_start_time(std::chrono::steady_clock::now()),
32
33
  m_total_files(0),
  m_is_retry(retry)
34
35
36
37
38
39
40
41
42
{
  // Parse configuration file
  XrdSsiPb::Config config(configfile);

  auto is_json       = config.getOptionValueBool("castor.json");
  auto dbconn        = config.getOptionValueStr("castor.db_login");
  auto max_num_conns = config.getOptionValueInt("castor.max_num_connections");
  auto batch_size    = config.getOptionValueInt("castor.batch_size");
  auto castor_prefix = config.getOptionValueStr("castor.prefix");
43
  auto is_dry_run    = config.getOptionValueBool("eos.dry_run");
44
45
46
47
  auto eos_prefix    = config.getOptionValueStr("eos.prefix");
  auto endpoint      = config.getOptionValueStr("eos.endpoint");
  auto token         = config.getOptionValueStr("eos.token");

48
49
50
51
52
53
54
  // Determine which DB table to use
  if(retry) {
    m_tableName = "CTAFILESFAILED";
  } else {
    m_tableName = "CTAFILESHELPER";
  }

55
56
57
58
  // Connect to Oracle
  if(!dbconn.first) {
    throw std::runtime_error("castor.db_login must be specified in the config file in the form oracle:user/password@TNS");
  }
59
60
61
62
63
64
65
66
  const auto dbLogin = rdbms::Login::parseString(dbconn.second);
  const uint64_t min_num_conns = 2;  // One connection for directory select and one for storage classes
  uint64_t actual_max_num_conns = min_num_conns;
  if(max_num_conns.first && ((uint64_t)max_num_conns.second) > min_num_conns) {
    actual_max_num_conns = max_num_conns.second;
  }
  m_dbConnPool = ::cta::make_unique<rdbms::ConnPool>(dbLogin, actual_max_num_conns);
  m_dbConn = m_dbConnPool->getConn();
67
68
69
70
71
72

  // Connect to EOS
  m_eosgrpc = eos::client::GrpcClient::Create(endpoint.first ? endpoint.second : "localhost:50051", token.second);

  // Set parameters and defaults
  m_is_json       = is_json.first       ? is_json.second       : false;
73
  m_is_dry_run    = is_dry_run.first    ? is_dry_run.second    : false;
74
  m_batch_size    = batch_size.first    ? batch_size.second    : 10000;
75
76
77
78
79
  m_castor_prefix = castor_prefix.first ? castor_prefix.second : "/castor/cern.ch/";
  m_eos_prefix    = eos_prefix.first    ? eos_prefix.second    : "/eos/grpc/";
  // enforce a slash at beginning and end of prefixes
  eos::client::checkPrefix(m_castor_prefix);
  eos::client::checkPrefix(m_eos_prefix);
80
81
82
}


83
84
void EosImportFiles::listFailureModes()
{
85
86
87
88
89
90
91
92
  const std::string sql = "SELECT"
    "  RETC,"
    "  REGEXP_REPLACE(MESSAGE, 'id=[0-9]*', 'id=<id>') AS MSG,"
    "  COUNT(*) AS CNT"
    "  FROM CTAFILESFAILED"
    "  GROUP BY RETC,"
    "  REGEXP_REPLACE(MESSAGE, 'id=[0-9]*', 'id=<id>')"
    "  ORDER BY RETC, MSG";
93
94
95
96
97
98
99
100
101
102
103
104
  auto stmt = m_dbConn.createStmt(sql);
  auto rset = stmt.executeQuery();

  while(rset.next()) {
    std::cout << rset.columnString("MSG") << " (retc=" << rset.columnUint64("RETC")
              << ", count=" << rset.columnUint64("CNT") << ")" << std::endl;
  }
}


void EosImportFiles::listFailures()
{
105
106
107
108
109
  const std::string sql = "SELECT"
    "  A.FILEID, B.PATH, A.FILENAME, A.RETC, A.MESSAGE"
    "  FROM CTAFILESFAILED A, CTADIRSHELPER B"
    "  WHERE A.PARENT_FILEID = B.FILEID";
  m_selectFilesDbStmt = m_dbConn.createStmt(sql);
110
111
112
113
  auto stmt = m_dbConn.createStmt(sql);
  auto rset = stmt.executeQuery();

  while(rset.next()) {
114
115
    std::cout << rset.columnString("FILEID") << "  "
              << rset.columnString("PATH") << "/" << rset.columnString("FILENAME") << "  "
116
117
118
119
120
121
122
              << rset.columnString("MESSAGE") << " (retc=" << rset.columnUint64("RETC") << ")" << std::endl;
  }
}


void EosImportFiles::forgetFailures()
{
123
  const std::string sql = "DELETE FROM CTAFILESFAILED";
124
  auto stmt = m_dbConn.createStmt(sql);
125
  std::cerr << "Deleting from CTAFILESFAILED...";
126
127
128
129
130
  stmt.executeNonQuery();
  std::cerr << "done." << std::endl;
}


131
132
133
void EosImportFiles::getStorageClasses()
{
  std::cerr << "Reading storage classes from CASTOR DB...";
134
135
136
137
138
139
140
141

  const std::string sql = "SELECT CLASSID, NAME FROM CNS_CLASS_METADATA";
  auto conn = m_dbConnPool->getConn();
  auto stmt = conn.createStmt(sql);
  auto rset = stmt.executeQuery();

  while(rset.next()) {
    const auto sc = std::make_pair(rset.columnUint64("CLASSID"), rset.columnString("NAME"));
142
143
    m_storageClass.insert(sc);
  }
144
145
146
147
148
149
150

  // Store reverse lookup as well (for error handling)
  for(auto &sc : m_storageClass) {
    const auto sc_rev = std::make_pair(sc.second, std::to_string(sc.first));
    m_storageClassRev.insert(sc_rev);
  }

151
152
153
154
  std::cerr << "(" << m_storageClass.size() << ")...done." << std::endl;
}


155
156
void EosImportFiles::getFileMetadata()
{
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
  const std::string sql =
    "SELECT "
    "  A.FILEID,"
    "  A.PARENT_FILEID,"
    "  B.PATH,"
    "  A.FILENAME,"
    "  A.DISK_UID,"
    "  A.DISK_GID,"
    "  A.FILEMODE,"
    "  A.BTIME,"
    "  A.CTIME,"
    "  A.MTIME,"
    "  A.CLASSID,"
    "  A.FILESIZE,"
    "  A.CHECKSUM,"
    "  A.COPYNO,"
    "  A.VID,"
    "  A.FSEQ,"
    "  A.BLOCKID,"
    "  A.S_MTIME "
177
    "FROM " + m_tableName + " A, CTADIRSHELPER B "
178
179
180
181
    "WHERE A.PARENT_FILEID = B.FILEID";
  m_selectFilesDbStmt = m_dbConn.createStmt(sql);
  std::cerr << "Executing SELECT query against " << m_tableName << " table...";
  m_selectFilesDbRset = m_selectFilesDbStmt.executeQuery();
182
183
184
185
  std::cerr << "done." << std::endl;
}


186
187
188
189
190
void EosImportFiles::saveFailedFiles(const std::vector<eos::rpc::FileMdProto> &files,
  const eos::rpc::InsertReply &replies)
{
  std::cerr << "Updating list of failed inserts...";

191
  auto file_it = files.begin();
192
193
194
  auto retc_it = replies.retc().begin();
  auto message_it = replies.message().begin();

195
  // Put the set of files into a temporary table.
196
197
198

  // Ensure the temporary table is empty (as the API does a commit after every transaction,
  // we have to preserve rows on commit)
199
  const std::string sql_truncate = "TRUNCATE TABLE CTAFILESFAILEDTEMP";
200
201
202
203
  auto stmt_truncate = m_dbConn.createStmt(sql_truncate);
  stmt_truncate.executeNonQuery();

  // This could be optimised by doing a batch insert.
204
  for( ; retc_it != replies.retc().end(); ++file_it, ++retc_it, ++message_it) {
205
206
207
208
209
210
    auto retc = *retc_it;

    // We only care about successes if we were reading from the failed table (because we need to
    // delete successful retries in this case)
    if(!m_is_retry && retc == 0) continue;

211
    // If the failure mode was "Attempted to create file with id=<id>, which already exists",
212
213
    // verify that the metadata in the EOS namespace is what we expect. If so, ignore the error and
    // treat it as a successful injection.
214
    if(m_is_retry && retc == EINVAL && compareMD(*file_it)) retc = 0;
215

216
    const std::string sql_insert = "INSERT INTO CTAFILESFAILEDTEMP("
217
      "  FILEID,"
218
219
      "  PARENT_FILEID,"
      "  FILENAME,"
220
221
222
      "  DISK_UID,"
      "  DISK_GID,"
      "  FILEMODE,"
223
      "  BTIME,"
224
225
226
      "  CTIME,"
      "  MTIME,"
      "  CLASSID,"
227
228
      "  FILESIZE,"
      "  CHECKSUM,"
229
230
231
232
      "  RETC,"
      "  MESSAGE) "
      "VALUES("
      "  :FILEID,"
233
234
      "  :PARENT_FILEID,"
      "  :FILENAME,"
235
236
237
      "  :DISK_UID,"
      "  :DISK_GID,"
      "  :FILEMODE,"
238
      "  :BTIME,"
239
240
241
      "  :CTIME,"
      "  :MTIME,"
      "  :CLASSID,"
242
243
244
      "  :FILESIZE,"
      "  :CHECKSUM,"
      "  :RETC,"
245
      "  :MESSAGE)";
246

247
    auto stmt = m_dbConn.createStmt(sql_insert);
248
249
250
251
252
253
254
255
256
257
258
259
260
    stmt.bindUint64(":FILEID", file_it->id());
    stmt.bindUint64(":PARENT_FILEID", file_it->cont_id());
    stmt.bindString(":FILENAME", file_it->name());
    stmt.bindUint64(":DISK_UID", file_it->uid());
    stmt.bindUint64(":DISK_GID", file_it->gid());
    stmt.bindUint64(":FILEMODE", file_it->flags());
    stmt.bindString(":BTIME", file_it->xattrs().at("eos.btime"));
    stmt.bindUint64(":CTIME", file_it->ctime().sec());
    stmt.bindUint64(":MTIME", file_it->mtime().sec());
    stmt.bindString(":CLASSID", m_storageClassRev.at(file_it->xattrs().at("CTA_StorageClass")));
    stmt.bindUint64(":FILESIZE", file_it->size());
    stmt.bindUint64(":CHECKSUM", convertChecksum(file_it->checksum().value()));
    stmt.bindUint64(":RETC", retc);
261
    stmt.bindString(":MESSAGE", message_it->empty() ? "-" : *message_it);
262

263
264
265
266
267
    stmt.executeNonQuery();
  }

  // Merge the temporary table into the failure table
  std::cerr << "merging...";
268
  const std::string sql_merge = "MERGE INTO CTAFILESFAILED A USING CTAFILESFAILEDTEMP B "
269
270
    "ON(A.FILEID = B.FILEID) "
    "WHEN MATCHED THEN UPDATE SET"
271
272
    "  A.PARENT_FILEID = B.PARENT_FILEID,"
    "  A.FILENAME = B.FILENAME,"
273
274
275
    "  A.DISK_UID = B.DISK_UID,"
    "  A.DISK_GID = B.DISK_GID,"
    "  A.FILEMODE = B.FILEMODE,"
276
    "  A.BTIME = B.BTIME,"
277
278
279
    "  A.CTIME = B.CTIME,"
    "  A.MTIME = B.MTIME,"
    "  A.CLASSID = B.CLASSID,"
280
281
    "  A.FILESIZE = B.FILESIZE,"
    "  A.CHECKSUM = B.CHECKSUM,"
282
283
284
285
286
    "  A.RETC = B.RETC,"
    "  A.MESSAGE = B.MESSAGE "
    "DELETE WHERE B.RETC = 0 "
    "WHEN NOT MATCHED THEN INSERT("
    "  A.FILEID,"
287
288
    "  A.PARENT_FILEID,"
    "  A.FILENAME,"
289
290
291
    "  A.DISK_UID,"
    "  A.DISK_GID,"
    "  A.FILEMODE,"
292
    "  A.BTIME,"
293
294
295
    "  A.CTIME,"
    "  A.MTIME,"
    "  A.CLASSID,"
296
297
    "  A.FILESIZE,"
    "  A.CHECKSUM,"
298
299
300
301
    "  A.RETC,"
    "  A.MESSAGE) "
    "VALUES("
    "  B.FILEID,"
302
303
    "  B.PARENT_FILEID,"
    "  B.FILENAME,"
304
305
306
    "  B.DISK_UID,"
    "  B.DISK_GID,"
    "  B.FILEMODE,"
307
    "  B.BTIME,"
308
309
310
    "  B.CTIME,"
    "  B.MTIME,"
    "  B.CLASSID,"
311
312
    "  B.FILESIZE,"
    "  B.CHECKSUM,"
313
314
315
316
317
318
319
320
321
322
323
    "  B.RETC,"
    "  B.MESSAGE) "
    "WHERE B.RETC != 0";
  auto stmt_merge = m_dbConn.createStmt(sql_merge);
  stmt_merge.executeNonQuery();
  std::cerr << "done." << std::endl;
}


bool EosImportFiles::compareMD(const eos::rpc::FileMdProto &file)
{
324
  return false;
325
#if 0
326
327
328
329
330
331
332
333
334
  auto remote_file = m_eosgrpc->GetMD(eos::rpc::CONTAINER, file.id(), file.path());

  // Check file metadata is the same
  if(file.id()          != remote_file.cmd().id()          ||
     file.uid()         != remote_file.cmd().uid()         ||
     file.gid()         != remote_file.cmd().gid()         ||
     file.ctime().sec() != remote_file.cmd().ctime().sec() ||
     file.name()        != remote_file.cmd().name()        ||
     (file.path() + '/')  != remote_file.cmd().path()) {
335
336
    return false;
  }
337
  if(file.mode()        != remote_file.cmd().mode()) {
338
    // EOS gRPC bug prevents us reading back the file mode
339
    std::cerr << "Warning: file " << file.id() << ": injected mode=" << file.mode() <<", read back mode=" << remote_file.cmd().mode() << std::endl;
340
341
342
  }

  // Check xattrs
343
344
345
346
  for(const auto &xattr : file.xattrs()) {
    auto xattr_it = remote_file.cmd().xattrs().find(xattr.first);
    if(xattr_it == remote_file.cmd().xattrs().end() || xattr.second != xattr_it->second) {
      std::cerr << "file " << file.id() << ": xattrs do not match" << std::endl;
347
348
349
350
351
      return false;
    }
  }

  return true;
352
#endif
353
354
355
}


356
357
bool EosImportFiles::processBatch()
{
358
359
  auto files = getNextBatch();
  if(files.empty()) return false;
360
361
362
363
364
365
366
367
368
369
370

  // Put results on stdout for debugging
  if(m_is_json) {
    char delim = '[';
    for(auto &file : files) {
      std::cout << delim << XrdSsiPb::Log::DumpProtobuf(&file);
      delim = ',';
    }
    if(delim == ',') std::cout << "]";
  }

371
372
373
  // Inject directories into EOS
  int num_errors = 0;
  eos::rpc::InsertReply replies;
374
  if(m_is_dry_run) {
375
376
377
    for(size_t i = 0; i < files.size(); ++i) {
      replies.add_retc(EPERM);
      replies.add_message("Dry run enabled, import into EOS suppressed");
378
    }
379
380
381
    num_errors = files.size();
  } else {
    num_errors = m_eosgrpc->FileInsert(files, replies);
382
383
384
  }

  auto elapsed_time = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time);
385
386
387
388
389
390
391
  std::cerr << "Processed " << m_total_files << " files in " << elapsed_time.count() << "s (" << num_errors << " failures)" << std::endl;

  // Save errors for later processing
  if(m_is_retry || num_errors > 0) {
    saveFailedFiles(files, replies);
  }

392
  return true;
393
394
395
}


396
std::vector<eos::rpc::FileMdProto> EosImportFiles::getNextBatch()
397
{
398
399
400
  std::vector<eos::rpc::FileMdProto> files;

  if(m_selectFilesDbRset.isEmpty()) return files;
401

402
403
404
405
406
407
408
409
410
411
  // See EOS common/LayoutId.hh for definitions of constants
  const int kAdler         =  0x2;
  const int kReplica       = (0x1 <<  4);
  const int kStripeSize    = (0x0 <<  8); // 1 stripe
  const int kStripeWidth   = (0x0 << 16); // 4K blocks
  const int kBlockChecksum = (0x1 << 20);
  
  // Layout id should be 00100012
  const uint64_t layout = kReplica | kAdler | kStripeSize | kStripeWidth | kBlockChecksum;

412

413
  for(unsigned int b = 0; b < m_batch_size && m_selectFilesDbRset.next(); ++b) {
414
    eos::rpc::FileMdProto file;
415

416
417
418
419
420
    file.set_id(m_selectFilesDbRset.columnUint64("FILEID"));
    file.set_cont_id(m_selectFilesDbRset.columnUint64("PARENT_FILEID"));
    file.set_uid(m_selectFilesDbRset.columnUint64("DISK_UID"));
    file.set_gid(m_selectFilesDbRset.columnUint64("DISK_GID"));
    file.set_size(m_selectFilesDbRset.columnUint64("FILESIZE"));
421
    file.set_layout_id(layout);
422
    // Filemode: filter out S_ISUID, S_ISGID and S_ISVTX because EOS does not follow POSIX semantics for these bits
423
    auto filemode = m_selectFilesDbRset.columnUint64("FILEMODE");
424
425
    filemode &= !(S_ISUID | S_ISGID | S_ISVTX);
    file.set_flags(filemode);
426
    file.mutable_checksum()->set_type("adler");
427
    file.mutable_checksum()->set_value(convertChecksum(m_selectFilesDbRset.columnUint64("CHECKSUM")));
428

429
    // Timestamps
430
431
    file.mutable_ctime()->set_sec(m_selectFilesDbRset.columnUint64("CTIME"));
    file.mutable_mtime()->set_sec(m_selectFilesDbRset.columnUint64("MTIME"));
Michael Davis's avatar
Michael Davis committed
432
    // we don't care about file.stime (sync time, used for CERNBox)
433
    // BTIME is set as an extended attribute (see below)
434

435
    // Filename and path
436
    auto p = eos::client::manglePathname(m_castor_prefix, m_eos_prefix, m_selectFilesDbRset.columnString("PATH"), m_selectFilesDbRset.columnString("FILENAME"));
437
438
    file.set_name(p.basename);
    file.set_path(p.pathname);
439
    // we don't care about link_name
440

Michael Davis's avatar
Michael Davis committed
441
442
443
    // Extended attributes:
    //
    // 1. Archive File ID
444
445
    std::string archiveId(std::to_string(file.id()));
    file.mutable_xattrs()->insert(google::protobuf::MapPair<std::string,std::string>("CTA_ArchiveFileId", archiveId));
Michael Davis's avatar
Michael Davis committed
446
    // 2. Storage Class
447
    auto sc = m_storageClass.at(m_selectFilesDbRset.columnUint64("CLASSID"));
448
    file.mutable_xattrs()->insert(google::protobuf::MapPair<std::string,std::string>("CTA_StorageClass", sc));
Michael Davis's avatar
Michael Davis committed
449
450
451
    // 3. Birth Time
    // POSIX ATIME (Access Time) is used by CASTOR to store the file creation time. EOS calls this "birth time",
    // but there is no place in the namespace to store it, so it is stored as an extended attribute.
452
    auto btime = m_selectFilesDbRset.columnUint64("BTIME");
Michael Davis's avatar
Michael Davis committed
453
    file.mutable_xattrs()->insert(google::protobuf::MapPair<std::string,std::string>("eos.btime", std::to_string(btime)));
454

455
    // Indicate that there is a tape-resident replica of this file
456
    file.mutable_locations()->Add(65535);
Michael Davis's avatar
Michael Davis committed
457
    // we don't care about unlink_locations (placeholder for files scheduled for deletion)
458

459
460
461
    files.push_back(file);
    ++m_total_files;
  }
462
  return files;
463
464
}

465
466
467
468
469
470
471
472

std::string EosImportFiles::convertChecksum(uint32_t adler32)
{
  char bytes[4];
  for(int i = 3; i >= 0; --i, adler32 >>= 8) { bytes[i] = adler32 & 0xFF; }
  return std::string(bytes, 4);
}

473
474
475
476
477
478
479
480

uint32_t EosImportFiles::convertChecksum(const std::string &bytes)
{
  uint32_t adler32;
  for(int i = 0; i < 4; ++i, adler32 <<= 8) { adler32 += bytes[i]; }
  return adler32;
}

481
}} // namespace cta::migration