RootEntry.cpp 46.4 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "RootEntry.hpp"
20
21
#include "AgentRegister.hpp"
#include "Agent.hpp"
22
#include "TapePool.hpp"
23
24
#include "TapePoolQueue.hpp"
#include "TapeQueue.hpp"
Eric Cano's avatar
Eric Cano committed
25
#include "DriveRegister.hpp"
26
#include "GenericObject.hpp"
27
#include "SchedulerGlobalLock.hpp"
28
#include <cxxabi.h>
29
#include "ProtocolBuffersAlgorithms.hpp"
30
#include <json-c/json.h>
31
32
33

// construtor, when the backend store exists.
// Checks the existence and correctness of the root entry
34
cta::objectstore::RootEntry::RootEntry(Backend & os):
35
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(os, "root") {}
36

37
cta::objectstore::RootEntry::RootEntry(GenericObject& go): 
38
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(go.objectStore()) {
39
40
41
42
43
44
  // Here we transplant the generic object into the new object
  go.transplantHeader(*this);
  // And interpret the header.
  getPayloadFromHeader();
}

45
46
47
// Initialiser. This uses the base object's initialiser and sets the defaults 
// of payload.
void cta::objectstore::RootEntry::initialize() {
48
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::initialize();
49
50
51
52
  // There is nothing to do for the payload.
  m_payloadInterpreted = true;
}

Eric Cano's avatar
Eric Cano committed
53
54
55
56
57
58
bool cta::objectstore::RootEntry::isEmpty() {
  checkPayloadReadable();
  if (m_payload.storageclasses_size())
    return false;
  if (m_payload.tapepoolpointers_size())
    return false;
59
60
  if (m_payload.has_driveregisterpointer() &&
      m_payload.driveregisterpointer().address().size())
Eric Cano's avatar
Eric Cano committed
61
62
63
    return false;
  if (m_payload.agentregisterintent().size())
    return false;
64
65
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size())
Eric Cano's avatar
Eric Cano committed
66
    return false;
67
68
  if (m_payload.has_schedulerlockpointer() &&
      m_payload.schedulerlockpointer().address().size())
69
    return false;
Eric Cano's avatar
Eric Cano committed
70
71
72
73
74
75
76
77
78
79
80
81
82
  return true;
}

void cta::objectstore::RootEntry::removeIfEmpty() {
  checkPayloadWritable();
  if (!isEmpty()) {
    throw NotEmpty("In RootEntry::removeIfEmpty(): root entry not empty");
  }
  remove();
}



83
// =============================================================================
84
// ================ Admin Hosts manipulations ==================================
85
// =============================================================================
86

87
88
89
90
91
92
// This operator will be used in the following usage of the templated
// findElement and removeOccurences
namespace {
  bool operator==(const std::string & hostName, 
    const cta::objectstore::serializers::AdminHost & ah) {
    return ah.hostname() == hostName;
93
94
95
  }
}

96
97
void cta::objectstore::RootEntry::addAdminHost(const std::string& hostname,
  const CreationLog& log) {
98
  checkPayloadWritable();
99
100
101
102
103
104
105
106
107
  // Check that the host is not listed already
  try {
    serializers::findElement(m_payload.adminhosts(), hostname);
    throw DuplicateEntry("In RootEntry::addAdminHost: entry already exists");
  } catch (serializers::NotFound &) {}
  // Add the host
  auto * ah = m_payload.add_adminhosts();
  ah->set_hostname(hostname);
  log.serialize(*ah->mutable_log());
108
109
}

110
void cta::objectstore::RootEntry::removeAdminHost(const std::string & hostname) {
111
  checkPayloadWritable();
112
113
114
  if (!serializers::isElementPresent(m_payload.adminhosts(), hostname))
    throw NoSuchAdminHost(
      "In RootEntry::removeAdminHost: trying to delete non-existing admin host.");
115
  serializers::removeOccurences(m_payload.mutable_adminhosts(), hostname);
116
117
}

118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
bool cta::objectstore::RootEntry::isAdminHost(const std::string& hostname) {
  checkPayloadReadable();
  return serializers::isElementPresent(m_payload.adminhosts(), hostname);
}

auto cta::objectstore::RootEntry::dumpAdminHosts() -> std::list<AdminHostDump> {
  checkPayloadReadable();
  std::list<AdminHostDump> ret;
  auto &list=m_payload.adminhosts();
  for (auto i=list.begin();i!=list.end(); i++) {
    ret.push_back(AdminHostDump());
    ret.back().hostname = i->hostname();
    ret.back().log.deserialize(i->log());
  }
  return ret;
133
134
}

135
136
137
138
// =============================================================================
// ================ Admin Users manipulations ==================================
// =============================================================================

139
140
141
142
143
144
145
146
147
// This operator will be used in the following usage of the templated
// findElement and removeOccurences
namespace {
  bool operator==(const cta::objectstore::UserIdentity & user, 
    const cta::objectstore::serializers::AdminUser & au) {
    return au.user().uid() == user.uid;
  }
}

148
149
150
151
void cta::objectstore::RootEntry::addAdminUser(const UserIdentity& user,
  const CreationLog& log) {
  checkPayloadWritable();
  // Check that the user does not exists already
152
153
154
  try {
    serializers::findElement(m_payload.adminusers(), user);
    throw DuplicateEntry("In RootEntry::addAdminUser: entry already exists");
155
  } catch (serializers::NotFound &) {}
156
157
158
159
160
161
162
  serializers::AdminUser * au = m_payload.add_adminusers();
  user.serialize(*au->mutable_user());
  log.serialize(*au->mutable_log());
}

void cta::objectstore::RootEntry::removeAdminUser(const UserIdentity& user) {
  checkPayloadWritable();
163
164
165
  if (!serializers::isElementPresent(m_payload.adminusers(), user))
    throw NoSuchAdminUser(
      "In RootEntry::removeAdminUser: trying to delete non-existing admin user.");
166
  serializers::removeOccurences(m_payload.mutable_adminusers(), user);
167
168
169
170
}

bool cta::objectstore::RootEntry::isAdminUser(const UserIdentity& user) {
  checkPayloadReadable();
171
  return serializers::isElementPresent(m_payload.adminusers(), user);
172
173
174
175
}

std::list<cta::objectstore::RootEntry::AdminUserDump> 
cta::objectstore::RootEntry::dumpAdminUsers() {
176
  checkPayloadReadable();
177
178
179
180
181
182
183
184
185
186
  std::list<cta::objectstore::RootEntry::AdminUserDump> ret;
  auto &list=m_payload.adminusers();
  for (auto i=list.begin(); i != list.end(); i++) {
    ret.push_back(AdminUserDump());
    ret.back().log.deserialize(i->log());
    ret.back().user.deserialize(i->user());
  }
  return ret;
}

187
// =============================================================================
188
// ========== Storage Class and archive routes manipulations ===================
189
190
191
192
193
194
195
196
197
198
199
// =============================================================================

// This operator will be used in the following usage of the templated
// findElement and removeOccurences
namespace {
  bool operator==(const std::string & scName, 
    const cta::objectstore::serializers::StorageClass & ssc) {
    return ssc.name() == scName;
  }
}

200
201
202
203
204
205
206
207
208
209
210
211
212
213
void cta::objectstore::RootEntry::checkStorageClassCopyCount(uint16_t copyCount) {
  // We cannot have a copy count of 0
  if (copyCount < 1) {
    throw InvalidCopyNumber(
      "In RootEntry::checkStorageClassCopyCount: copyCount cannot be less than 1");
  }
  if (copyCount > maxCopyCount) {
    std::stringstream err;
    err << "In RootEntry::checkStorageClassCopyCount: copyCount cannot be bigger than "
        << maxCopyCount;
    throw InvalidCopyNumber(err.str());
  }
}

214
215
216
217
218
219
220
void cta::objectstore::RootEntry::addStorageClass(const std::string storageClass, 
    uint16_t copyCount, const CreationLog & log) {
  checkPayloadWritable();
  // Check the storage class does not exist already
  try {
    serializers::findElement(m_payload.storageclasses(), storageClass);
    throw DuplicateEntry("In RootEntry::addStorageClass: trying to create duplicate entry");
221
  } catch (serializers::NotFound &) {}
222
223
  // Check the copy count is valid
  checkStorageClassCopyCount(copyCount);
224
225
226
227
228
229
230
231
232
  // Insert the storage class
  auto * sc = m_payload.mutable_storageclasses()->Add();
  sc->set_name(storageClass);
  sc->set_copycount(copyCount);
  log.serialize(*sc->mutable_log());
}

void cta::objectstore::RootEntry::removeStorageClass(const std::string storageClass) {
  checkPayloadWritable();
233
234
235
236
237
238
239
  if (!serializers::isElementPresent(m_payload.storageclasses(), storageClass))
    throw NoSuchStorageClass(
      "In RootEntry::removeStorageClass: trying to delete non-existing storage class.");
  if (serializers::findElement(m_payload.storageclasses(), storageClass).routes_size()) {
    throw StorageClassHasActiveRoutes(
      "In RootEntry::removeStorageClass: trying to delete storage class with active routes.");
  }
240
241
242
243
  serializers::removeOccurences(m_payload.mutable_storageclasses(), storageClass);
}

void cta::objectstore::RootEntry::setStorageClassCopyCount(
244
  const std::string& storageClass, uint16_t copyCount, const CreationLog & log ) {
245
246
247
248
  checkPayloadWritable();
  auto & sc = serializers::findElement(m_payload.mutable_storageclasses(), storageClass);
  // If we reduce the number of routes, we have to remove the extra ones.
  if (copyCount < sc.copycount()) {
249
    for (size_t i = copyCount+1; i<sc.copycount()+1; i++) {
250
251
252
      serializers::removeOccurences(sc.mutable_routes(), i);
    }
  }
253
  // update the creation log and set the count
254
255
256
257
  sc.set_copycount(copyCount);
}

uint16_t cta::objectstore::RootEntry::getStorageClassCopyCount (
258
  const std::string & storageClass) {
259
260
261
262
263
264
265
266
267
  checkPayloadReadable();
  auto & sc = serializers::findElement(m_payload.storageclasses(), storageClass);
  return sc.copycount();
}

// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(uint16_t copyNb, 
268
    const cta::objectstore::serializers::ArchiveRoute & ar) {
269
270
271
272
    return ar.copynb() == copyNb;
  }
}

273
void cta::objectstore::RootEntry::addArchiveRoute(const std::string& storageClass,
274
275
276
  uint16_t copyNb, const std::string& tapePool, const CreationLog& cl) {
  checkPayloadWritable();
  // Find the storageClass entry
277
  if (copyNb > maxCopyCount || copyNb <= 0) {
278
    std::stringstream ss;
279
    ss << "In RootEntry::addArchiveRoute: invalid copy number: "  << 
280
281
282
283
        copyNb <<  " > " << maxCopyCount;
    throw InvalidCopyNumber(ss.str());
  }
  auto & sc = serializers::findElement(m_payload.mutable_storageclasses(), storageClass);
284
  if (copyNb > sc.copycount() || copyNb <= 0) {
285
    std::stringstream ss;
286
    ss << "In RootEntry::addArchiveRoute: copy number out of range: " <<
287
        copyNb << " >= " << sc.copycount();
288
    throw InvalidCopyNumber(ss.str());
289
  }
290
  // Find the archive route (if it exists)
291
  try {
292
    // It does: update is not allowed.
293
    auto &ar = serializers::findElement(sc.mutable_routes(), copyNb);
294
    throw ArchiveRouteAlreadyExists("In RootEntry::addArchiveRoute: route already exists");
295
296
297
    // Sanity check: is it the right route?
    if (ar.copynb() != copyNb) {
      throw exception::Exception(
298
        "In RootEntry::addArchiveRoute: internal error: extracted wrong route");
299
    }
300
    throw ArchiveRouteAlreadyExists("In RootEntry::addArchiveRoute: trying to add an existing route");
301
302
303
    cl.serialize(*ar.mutable_log());
    ar.set_tapepool(tapePool);
  } catch (serializers::NotFound &) {
304
305
306
307
308
    // The route is not present yet. Add it if we do not have the same tape pool
    // for 2 routes
    auto & routes = sc.routes();
    for (auto r=routes.begin(); r != routes.end(); r++) {
      if (r->tapepool() == tapePool) {
309
        throw TapePoolUsedInOtherRoute ("In RootEntry::addArchiveRoute: cannot add a second route to the same tape pool");
310
311
      }
    }
312
313
314
315
316
317
318
    auto *ar = sc.mutable_routes()->Add();
    cl.serialize(*ar->mutable_log());
    ar->set_copynb(copyNb);
    ar->set_tapepool(tapePool);
  }
}

319
void cta::objectstore::RootEntry::removeArchiveRoute(
320
321
322
323
  const std::string& storageClass, uint16_t copyNb) {
  checkPayloadWritable();
  // Find the storageClass entry
  auto & sc = serializers::findElement(m_payload.mutable_storageclasses(), storageClass);
324
  if (!serializers::isElementPresent(sc.routes(), copyNb))
325
326
    throw NoSuchArchiveRoute(
      "In RootEntry::removeArchiveRoute: trying to delete non-existing archive route.");
327
328
329
330
  serializers::removeOccurences(sc.mutable_routes(), copyNb);
}


331
std::vector<std::string> cta::objectstore::RootEntry::getArchiveRoutes(const std::string storageClass) {
332
333
334
  checkPayloadReadable();
  auto & sc = serializers::findElement(m_payload.storageclasses(), storageClass);
  std::vector<std::string> ret;
335
336
  int copycount = sc.copycount();
  copycount = copycount;
337
  ret.resize(sc.copycount());
338
339
340
341
342
343
344
  // If the number of routes is not right, fail.
  if (sc.copycount() != (uint32_t) sc.routes_size()) {
    std::stringstream err;
    err << "In RootEntry::getArchiveRoutes: not enough routes defined: "
        << sc.routes_size() << " found out of " << sc.copycount();
    throw IncompleteEntry(err.str());
  }
345
346
  auto & list = sc.routes();
  for (auto i = list.begin(); i!= list.end(); i++) {
347
348
349
    int copyNb = i->copynb();
    copyNb = copyNb;
    ret[i->copynb()-1] = i->tapepool();
350
351
352
353
  }
  return ret;
}

354
auto cta::objectstore::RootEntry::dumpStorageClasses() -> std::list<StorageClassDump> {
355
  checkPayloadReadable();
356
357
358
359
360
361
  std::list<StorageClassDump> ret;
  auto & scl = m_payload.storageclasses();
  for (auto i=scl.begin(); i != scl.end(); i++) {
    ret.push_back(StorageClassDump());
    ret.back().copyCount = i->copycount();
    ret.back().storageClass = i->name();
362
    ret.back().log.deserialize(i->log());
363
364
    auto & arl = i->routes();
    for (auto j=arl.begin(); j!= arl.end(); j++) {
365
      ret.back().routes.push_back(StorageClassDump::ArchiveRouteDump());
366
367
368
369
370
371
372
373
      auto &r = ret.back().routes.back();
      r.copyNumber = j->copynb();
      r.tapePool = j->tapepool();
      r.log.deserialize(j->log());
    }
  }
  return ret;
}
374

375
376
377
378
379
380
381
382
383
384
385
auto cta::objectstore::RootEntry::dumpStorageClass(const std::string& name)
  -> StorageClassDump {
  checkPayloadReadable();
  auto & scl = m_payload.storageclasses();
  for (auto i=scl.begin(); i != scl.end(); i++) {
    if (i->name() == name) {
      StorageClassDump ret;
      ret.copyCount = i->copycount();
      ret.storageClass = i->name();
      auto & arl = i->routes();
      for (auto j=arl.begin(); j!= arl.end(); j++) {
386
        ret.routes.push_back(StorageClassDump::ArchiveRouteDump());
387
388
389
390
391
392
393
394
395
396
397
398
399
400
        auto &r = ret.routes.back();
        r.copyNumber = j->copynb();
        r.tapePool = j->tapepool();
        r.log.deserialize(j->log());
      }
      return ret;
    }
  }
  std::stringstream err;
  err << "In RootEntry::dumpStorageClass: no such storage class: "
      << name;
  throw NoSuchStorageClass(err.str());
}

401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
// =============================================================================
// ========== Libraries manipulations ==========================================
// =============================================================================

// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(const std::string &l,
    const cta::objectstore::serializers::Library & sl) {
    return sl.name() == l;
  }
}

void cta::objectstore::RootEntry::addLibrary(const std::string& library,
    const CreationLog & log) {
  checkPayloadWritable();
417
  // Check the library does not exist aclready
418
419
420
421
422
423
424
425
426
427
428
429
  try {
    serializers::findElement(m_payload.libraries(), library);
    throw DuplicateEntry("In RootEntry::addLibrary: trying to create duplicate entry");
  } catch (serializers::NotFound &) {}
  // Insert the library
  auto * l = m_payload.mutable_libraries()->Add();
  l->set_name(library);
  log.serialize(*l->mutable_log());
}

void cta::objectstore::RootEntry::removeLibrary(const std::string& library) {
  checkPayloadWritable();
430
431
432
  if (!serializers::isElementPresent(m_payload.libraries(), library))
    throw NoSuchLibrary(
      "In RootEntry::removeLibrary: trying to delete non-existing library.");
433
434
435
436
437
438
439
440
  serializers::removeOccurences(m_payload.mutable_libraries(), library);
}

bool cta::objectstore::RootEntry::libraryExists(const std::string& library) {
  checkPayloadReadable();
  return serializers::isElementPresent(m_payload.libraries(), library);
}

441
auto cta::objectstore::RootEntry::dumpLibraries() -> std::list<LibraryDump> {
442
  checkPayloadReadable();
443
  std::list<LibraryDump> ret;
444
445
  auto & list=m_payload.libraries();
  for (auto i=list.begin(); i!=list.end(); i++) {
446
447
448
    ret.push_back(LibraryDump());
    ret.back().library = i->name();
    ret.back().log.deserialize(i->log());
449
450
451
452
  }
  return ret;
}

453
454
455
456
457
458
459
460
461
462
463
464
465
// =============================================================================
// ========== Tape pools manipulations =========================================
// =============================================================================


// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(const std::string &tp,
    const cta::objectstore::serializers::TapePoolPointer & tpp) {
    return tpp.name() == tp;
  }
}
466

Eric Cano's avatar
Eric Cano committed
467
std::string cta::objectstore::RootEntry::addOrGetTapePoolAndCommit(const std::string& tapePool,
468
469
  uint32_t nbPartialTapes, uint16_t maxRetriesPerMount, uint16_t maxTotalRetries,
  Agent& agent, const CreationLog& log) {
Eric Cano's avatar
Eric Cano committed
470
  checkPayloadWritable();
471
472
  // Check the tape pool does not already exist
  try {
Eric Cano's avatar
Eric Cano committed
473
    return serializers::findElement(m_payload.tapepoolpointers(), tapePool).address();
474
475
  } catch (serializers::NotFound &) {}
  // Insert the tape pool, then its pointer, with agent intent log update
Eric Cano's avatar
Eric Cano committed
476
  // First generate the intent. We expect the agent to be passed locked.
477
  std::string tapePoolAddress = agent.nextId("tapePool");
478
  // TODO Do we expect the agent to be passed locked or not: to be clarified.
Eric Cano's avatar
Eric Cano committed
479
  ScopedExclusiveLock agl(agent);
480
481
482
483
  agent.fetch();
  agent.addToOwnership(tapePoolAddress);
  agent.commit();
  // Then create the tape pool object
484
  TapePool tp(tapePoolAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
485
486
  tp.initialize(tapePool);
  tp.setOwner(agent.getAddressIfSet());
487
488
  tp.setMaxRetriesWithinMount(maxRetriesPerMount);
  tp.setMaxTotalRetries(maxTotalRetries);
489
  tp.setBackupOwner("root");
490
491
492
493
494
495
  tp.insert();
  ScopedExclusiveLock tpl(tp);
  // Now move the tape pool's ownership to the root entry
  auto * tpp = m_payload.mutable_tapepoolpointers()->Add();
  tpp->set_address(tapePoolAddress);
  tpp->set_name(tapePool);
496
  tpp->set_nbpartialtapes(nbPartialTapes);
Eric Cano's avatar
Eric Cano committed
497
  log.serialize(*tpp->mutable_log());
498
499
500
501
502
503
504
505
506
  // We must commit here to ensure the tape pool object is referenced.
  commit();
  // Now update the tape pool's ownership.
  tp.setOwner(getAddressIfSet());
  tp.setBackupOwner(getAddressIfSet());
  tp.commit();
  // ... and clean up the agent
  agent.removeFromOwnership(tapePoolAddress);
  agent.commit();
Eric Cano's avatar
Eric Cano committed
507
  return tapePoolAddress;
508
}
509

Eric Cano's avatar
Eric Cano committed
510
void cta::objectstore::RootEntry::removeTapePoolAndCommit(const std::string& tapePool) {
511
512
513
514
  checkPayloadWritable();
  // find the address of the tape pool object
  try {
    auto tpp = serializers::findElement(m_payload.tapepoolpointers(), tapePool);
515
516
517
518
519
520
521
    // Check that the tape pool is not referenced by any route
    auto & scl = m_payload.storageclasses();
    for (auto sc=scl.begin(); sc!=scl.end(); sc++) {
      auto & rl=sc->routes();
      for (auto r=rl.begin(); r!=rl.end(); r++) {
        if (r->tapepool() == tapePool)
          throw TapePoolUsedInRoute(
522
            "In RootEntry::removeTapePoolAndCommit: trying to remove a tape pool used in archive route");
523
524
      }
    }
525
    // Open the tape pool object
526
    TapePool tp (tpp.address(), ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
    ScopedExclusiveLock tpl(tp);
    tp.fetch();
    // Verify this is the tapepool we're looking for.
    if (tp.getName() != tapePool) {
      std::stringstream err;
      err << "Unexpected tape pool name found in object pointed to for tape pool: "
          << tapePool << " found: " << tp.getName();
      throw WrongTapePool(err.str());
    }
    // Check the tape pool is empty
    if (!tp.isEmpty()) {
      throw TapePoolNotEmpty ("In RootEntry::removeTapePoolAndCommit: trying to "
          "remove a non-empty tape pool");
    }
    // We can delete the pool
    tp.remove();
    // ... and remove it from our entry
    serializers::removeOccurences(m_payload.mutable_tapepoolpointers(), tapePool);
    // We commit for safety and symmetry with the add operation
    commit();
  } catch (serializers::NotFound &) {
    // No such tape pool. Nothing to to.
549
    throw NoSuchTapePool("In RootEntry::removeTapePoolAndCommit: trying to remove non-existing tape pool");
550
551
  }
}
552

553
554
std::string cta::objectstore::RootEntry::getTapePoolAddress(const std::string& tapePool) {
  checkPayloadReadable();
Eric Cano's avatar
Eric Cano committed
555
556
557
558
559
560
  try {
    auto & tpp = serializers::findElement(m_payload.tapepoolpointers(), tapePool);
    return tpp.address();
  } catch (serializers::NotFound &) {
    throw NotAllocated("In RootEntry::getTapePoolAddress: tape pool not allocated");
  }
561
}
562

563
auto cta::objectstore::RootEntry::dumpTapePools() -> std::list<TapePoolDump> {
564
565
566
567
568
569
570
  checkPayloadReadable();
  std::list<TapePoolDump> ret;
  auto & tpl = m_payload.tapepoolpointers();
  for (auto i = tpl.begin(); i!=tpl.end(); i++) {
    ret.push_back(TapePoolDump());
    ret.back().address = i->address();
    ret.back().tapePool = i->name();
571
    ret.back().nbPartialTapes = i->nbpartialtapes();
572
573
574
575
    ret.back().log.deserialize(i->log());
  }
  return ret;
}
576

577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
// =============================================================================
// ========== Tape pool queues manipulations =========================================
// =============================================================================


// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(const std::string &tp,
    const cta::objectstore::serializers::TapePoolQueuePointer & tpp) {
    return tpp.name() == tp;
  }
}

std::string cta::objectstore::RootEntry::addOrGetTapePoolQueueAndCommit(const std::string& tapePool, Agent& agent) {
  checkPayloadWritable();
  // Check the tape pool does not already exist
  try {
    return serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool).address();
  } catch (serializers::NotFound &) {}
  // Insert the tape pool, then its pointer, with agent intent log update
  // First generate the intent. We expect the agent to be passed locked.
  std::string tapePoolQueueAddress = agent.nextId("tapePoolQueue");
  // TODO Do we expect the agent to be passed locked or not: to be clarified.
  ScopedExclusiveLock agl(agent);
  agent.fetch();
  agent.addToOwnership(tapePoolQueueAddress);
  agent.commit();
  // Then create the tape pool object
  TapePool tp(tapePoolQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  tp.initialize(tapePool);
  tp.setOwner(agent.getAddressIfSet());
  tp.setBackupOwner("root");
  tp.insert();
  ScopedExclusiveLock tpl(tp);
  // Now move the tape pool's ownership to the root entry
  auto * tpp = m_payload.mutable_tapepoolqueuepointers()->Add();
  tpp->set_address(tapePoolQueueAddress);
  // We must commit here to ensure the tape pool object is referenced.
  commit();
  // Now update the tape pool's ownership.
  tp.setOwner(getAddressIfSet());
  tp.setBackupOwner(getAddressIfSet());
  tp.commit();
  // ... and clean up the agent
  agent.removeFromOwnership(tapePoolQueueAddress);
  agent.commit();
  return tapePoolQueueAddress;
}

void cta::objectstore::RootEntry::removeTapePoolQueueAndCommit(const std::string& tapePool) {
  checkPayloadWritable();
  // find the address of the tape pool object
  try {
    auto tpp = serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool);
    // Open the tape pool object
    TapePoolQueue tp (tpp.address(), ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
    ScopedExclusiveLock tpl(tp);
    tp.fetch();
    // Verify this is the tapepool we're looking for.
    if (tp.getName() != tapePool) {
      std::stringstream err;
      err << "Unexpected tape pool name found in object pointed to for tape pool: "
          << tapePool << " found: " << tp.getName();
      throw WrongTapePoolQueue(err.str());
    }
    // Check the tape pool is empty
    if (!tp.isEmpty()) {
      throw TapePoolQueueNotEmpty ("In RootEntry::removeTapePoolQueueAndCommit: trying to "
          "remove a non-empty tape pool");
    }
    // We can delete the pool
    tp.remove();
    // ... and remove it from our entry
    serializers::removeOccurences(m_payload.mutable_tapepoolqueuepointers(), tapePool);
    // We commit for safety and symmetry with the add operation
    commit();
  } catch (serializers::NotFound &) {
    // No such tape pool. Nothing to to.
    throw NoSuchTapePoolQueue("In RootEntry::removeTapePoolQueueAndCommit: trying to remove non-existing tape pool");
  }
}

std::string cta::objectstore::RootEntry::getTapePoolQueueAddress(const std::string& tapePool) {
  checkPayloadReadable();
  try {
    auto & tpp = serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool);
    return tpp.address();
  } catch (serializers::NotFound &) {
    throw NotAllocated("In RootEntry::getTapePoolQueueAddress: tape pool not allocated");
  }
}

auto cta::objectstore::RootEntry::dumpTapePoolQueues() -> std::list<TapePoolQueueDump> {
  checkPayloadReadable();
  std::list<TapePoolQueueDump> ret;
  auto & tpl = m_payload.tapepoolqueuepointers();
  for (auto i = tpl.begin(); i!=tpl.end(); i++) {
    ret.push_back(TapePoolQueueDump());
    ret.back().address = i->address();
    ret.back().tapePool = i->name();
  }
  return ret;
}

Eric Cano's avatar
Eric Cano committed
682
683
684
685
686
687
688
689
690
691
692
693
694
// =============================================================================
// ================ Drive register manipulation ================================
// =============================================================================

std::string cta::objectstore::RootEntry::addOrGetDriveRegisterPointerAndCommit(
  Agent & agent, const CreationLog & log) {
  checkPayloadWritable();
  // Check if the drive register exists
  try {
    return getDriveRegisterAddress();
  } catch (NotAllocated &) {
    // decide on the object's name and add to agent's intent. We expect the
    // agent to be passed locked.
695
    std::string drAddress (agent.nextId("driveRegister"));
Eric Cano's avatar
Eric Cano committed
696
697
698
699
    ScopedExclusiveLock agl(agent);
    agent.fetch();
    agent.addToOwnership(drAddress);
    agent.commit();
700
    // Then create the drive register object
Eric Cano's avatar
Eric Cano committed
701
702
    DriveRegister dr(drAddress, m_objectStore);
    dr.initialize();
703
704
    dr.setOwner(agent.getAddressIfSet());
    dr.setBackupOwner(getAddressIfSet());
Eric Cano's avatar
Eric Cano committed
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
    dr.insert();
    // Take a lock on drive registry
    ScopedExclusiveLock drLock(dr);
    // Move drive registry ownership to the root entry
    auto * mdrp = m_payload.mutable_driveregisterpointer();
    mdrp->set_address(drAddress);
    log.serialize(*mdrp->mutable_log());
    commit();
    // Record completion in drive registry
    dr.setOwner(getAddressIfSet());
    dr.setBackupOwner(getAddressIfSet());
    dr.commit();
    //... and clean up the agent
    agent.removeFromOwnership(drAddress);
    agent.commit();
    return drAddress;
  }
}

void cta::objectstore::RootEntry::removeDriveRegisterAndCommit() {
  checkPayloadWritable();
  // Get the address of the drive register (nothing to do if there is none)
727
728
  if (!m_payload.has_driveregisterpointer() || 
      !m_payload.driveregisterpointer().address().size())
Eric Cano's avatar
Eric Cano committed
729
730
    return;
  std::string drAddr = m_payload.driveregisterpointer().address();
731
  DriveRegister dr(drAddr, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
Eric Cano's avatar
Eric Cano committed
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
  ScopedExclusiveLock drl(dr);
  dr.fetch();
  // Check the drive register is empty
  if (!dr.isEmpty()) {
    throw DriveRegisterNotEmpty("In RootEntry::removeDriveRegisterAndCommit: "
      "trying to remove a non-empty drive register");
  }
  // we can delete the drive register
  dr.remove();
  // And update the root entry
  m_payload.mutable_driveregisterpointer()->set_address("");
  // We commit for safety and symmetry with the add operation
  commit();
}

std::string cta::objectstore::RootEntry::getDriveRegisterAddress() {
748
  checkPayloadReadable();
749
750
  if (m_payload.has_driveregisterpointer() && 
      m_payload.driveregisterpointer().address().size()) {
Eric Cano's avatar
Eric Cano committed
751
752
753
754
755
756
    return m_payload.driveregisterpointer().address();
  }
  throw NotAllocated("In RootEntry::getDriveRegisterAddress: drive register not allocated");
}


757
758
759
760
// =============================================================================
// ================ Agent register manipulation ================================
// =============================================================================
// Get the name of the agent register (or exception if not available)
761
762
std::string cta::objectstore::RootEntry::getAgentRegisterAddress() {
  checkPayloadReadable();
763
  // If the registry is defined, return it, job done.
764
765
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size())
766
    return m_payload.agentregisterpointer().address();
767
  throw NotAllocated("In RootEntry::getAgentRegister: agentRegister not yet allocated");
768
769
770
}

// Get the name of a (possibly freshly created) agent register
771
std::string cta::objectstore::RootEntry::addOrGetAgentRegisterPointerAndCommit(Agent & agent,
772
773
774
  const CreationLog & log) {
  // Check if the agent register exists
  try {
775
776
    return getAgentRegisterAddress();
  } catch (NotAllocated &) {
777
778
779
    // If we get here, the agent register is not created yet, so we have to do it:
    // lock the entry again, for writing. We take the lock ourselves if needed
    // This will make an autonomous transaction
780
    checkPayloadWritable();
781
    fetch();
782
783
    if (m_payload.has_agentregisterpointer() &&
        m_payload.agentregisterpointer().address().size()) {
784
785
786
      return m_payload.agentregisterpointer().address();
    }
    // decide on the object's name
787
    std::string arAddress (agent.nextId("agentRegister"));
788
    // Record the agent registry in our own intent
789
    addIntendedAgentRegistry(arAddress);
790
791
    commit();
    // Create the agent registry
792
    AgentRegister ar(arAddress, m_objectStore);
793
794
795
796
797
798
799
800
801
    ar.initialize();
    // There is no garbage collection for an agent registry: if it is not
    // plugged to the root entry, it does not exist.
    ar.setOwner("");
    ar.setBackupOwner("");
    ar.insert();
    // Take a lock on agent registry
    ScopedExclusiveLock arLock(ar);
    // Move agent registry from intent to official
802
803
804
805
    auto * marp = m_payload.mutable_agentregisterpointer();
    marp->set_address(arAddress);
    log.serialize(*marp->mutable_log());
    m_payload.set_agentregisterintent("");
806
807
    commit();
    // Record completion in agent registry
808
809
    ar.setOwner(getAddressIfSet());
    ar.setBackupOwner(getAddressIfSet());
810
811
812
    ar.commit();
    // And we are done. Release locks
    arLock.release();
813
814
815
816
    return arAddress;
  }
}

Eric Cano's avatar
Eric Cano committed
817
void cta::objectstore::RootEntry::removeAgentRegisterAndCommit() {
818
819
820
821
822
  checkPayloadWritable();
  // Check that we do have an agent register set. Cleanup a potential intent as
  // well
  if (m_payload.agentregisterintent().size()) {
    AgentRegister iar(m_payload.agentregisterintent(),
823
      ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
824
825
826
827
828
829
830
831
832
    ScopedExclusiveLock iarl(iar);
    // An agent register only referenced in the intent should not be used
    // and hence empty. We'll see that.
    iar.fetch();
    if (!iar.isEmpty()) {
      throw AgentRegisterNotEmpty("In RootEntry::removeAgentRegister: found "
        "a non-empty intended agent register. Internal error.");
    }
    iar.remove();
Eric Cano's avatar
Eric Cano committed
833
834
    m_payload.set_agentregisterintent("");
    commit();
835
  }
836
837
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size()) {
838
    AgentRegister ar(m_payload.agentregisterpointer().address(),
839
      ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
840
    ScopedExclusiveLock arl(ar);
Eric Cano's avatar
Eric Cano committed
841
    ar.fetch();
842
    if (!ar.isEmpty()) {
Eric Cano's avatar
Eric Cano committed
843
844
      throw AgentRegisterNotEmpty("In RootEntry::removeAgentRegister: the agent "
        "register is not empty. Cannot remove.");
845
    }
Eric Cano's avatar
Eric Cano committed
846
847
848
    ar.remove();
    m_payload.mutable_agentregisterpointer()->set_address("");
    commit();
849
850
851
852
853
854
855
856
857
  }
}

void cta::objectstore::RootEntry::addIntendedAgentRegistry(const std::string& address) {
  checkPayloadWritable();
  // We are supposed to have only one intended agent registry at a time.
  // If we got the lock and there is one entry, this means the previous
  // attempt to create one did not succeed.
  // When getting here, having a set pointer to the registry is an error.
858
859
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size()) {
860
861
862
863
864
865
866
867
868
    throw exception::Exception("In cta::objectstore::RootEntry::addIntendedAgentRegistry:"
        " pointer to registry already set");
  }
  if (m_payload.agentregisterintent().size()) {
    // The intended object might not have made it to the official pointer.
    // If it did, we just clean up the intent.
    // If it did not, we clean up the object if present, clean up the intent
    // and replace it with the new one.
    // We do not recycle the object, as the state is doubtful.
869
    if (ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore.exists(
870
871
      m_payload.agentregisterintent())) {
      AgentRegister iar(m_payload.agentregisterintent(),
872
        ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
873
874
875
876
877
878
      iar.fetch();
      if (!iar.isEmpty()) {
        throw AgentRegisterNotEmpty("In RootEntry::addIntendedAgentRegistry, "
          "found a non-empty intended agent register. Internal Error.");
      }
      iar.remove();
879
880
881
882
883
    }
  }
  m_payload.set_agentregisterintent(address);
}

884
885
886
887
888
889
890
// =============================================================================
// ================ Scheduler global lock manipulation =========================
// =============================================================================

std::string cta::objectstore::RootEntry::getSchedulerGlobalLock() {
  checkPayloadReadable();
  // If the scheduler lock is defined, return it, job done.
891
892
  if (m_payload.has_schedulerlockpointer() &&
      m_payload.schedulerlockpointer().address().size())
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
    return m_payload.schedulerlockpointer().address();
  throw NotAllocated("In RootEntry::getAgentRegister: scheduler global lock not yet allocated");
}

// Get the name of a (possibly freshly created) scheduler global lock
std::string cta::objectstore::RootEntry::addOrGetSchedulerGlobalLockAndCommit(Agent & agent,
  const CreationLog & log) {
  checkPayloadWritable();
  // Check if the drive register exists
  try {
    return getSchedulerGlobalLock();
  } catch (NotAllocated &) {
    // decide on the object's name and add to agent's intent. We expect the
    // agent to be passed locked.
    std::string sglAddress (agent.nextId("schedulerGlobalLock"));
    ScopedExclusiveLock agl(agent);
    agent.fetch();
    agent.addToOwnership(sglAddress);
    agent.commit();
    // Then create the drive register object
    SchedulerGlobalLock sgl(sglAddress, m_objectStore);
    sgl.initialize();
    sgl.setOwner(agent.getAddressIfSet());
    sgl.setBackupOwner(getAddressIfSet());
    sgl.insert();
    // Take a lock on scheduler global lock
    ScopedExclusiveLock sglLock(sgl);
    // Move drive registry ownership to the root entry
    auto * msgl = m_payload.mutable_schedulerlockpointer();
    msgl->set_address(sglAddress);
    log.serialize(*msgl->mutable_log());
    commit();
    // Record completion in scheduler global lock
    sgl.setOwner(getAddressIfSet());
    sgl.setBackupOwner(getAddressIfSet());
    sgl.commit();
    //... and clean up the agent
    agent.removeFromOwnership(sglAddress);
    agent.commit();
    return sglAddress;
  }
}

void cta::objectstore::RootEntry::removeSchedulerGlobalLockAndCommit() {
  checkPayloadWritable();
  // Get the address of the scheduler lock (nothing to do if there is none)
939
940
  if (!m_payload.has_schedulerlockpointer() ||
      !m_payload.schedulerlockpointer().address().size())
941
942
    return;
  std::string sglAddress = m_payload.schedulerlockpointer().address();
943
  SchedulerGlobalLock sgl(sglAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
  ScopedExclusiveLock sgll(sgl);
  sgl.fetch();
  // Check the drive register is empty
  if (!sgl.isEmpty()) {
    throw DriveRegisterNotEmpty("In RootEntry::removeSchedulerGlobalLockAndCommit: "
      "trying to remove a non-empty scheduler global lock");
  }
  // we can delete the drive register
  sgl.remove();
  // And update the root entry
  m_payload.mutable_schedulerlockpointer()->set_address("");
  // We commit for safety and symmetry with the add operation
  commit();
}


960
// Dump the root entry
961
std::string cta::objectstore::RootEntry::dump () {  
962
  checkPayloadReadable();
963
  std::stringstream ret;
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
  ret << "RootEntry" << std::endl;
  struct json_object * jo = json_object_new_object();
  
  json_object_object_add(jo, "agentregisterintent", json_object_new_string(m_payload.agentregisterintent().c_str()));
  
  {
    json_object * pointer = json_object_new_object();
    json_object_object_add(pointer, "address", json_object_new_string(m_payload.driveregisterpointer().address().c_str()));
    json_object * jlog = json_object_new_object();
    json_object_object_add(jlog, "comment", json_object_new_string(m_payload.driveregisterpointer().log().comment().c_str()));
    json_object_object_add(jlog, "host", json_object_new_string(m_payload.driveregisterpointer().log().host().c_str()));
    json_object_object_add(jlog, "time", json_object_new_int64(m_payload.driveregisterpointer().log().time()));
    json_object * id = json_object_new_object();
    json_object_object_add(id, "uid", json_object_new_int(m_payload.driveregisterpointer().log().user().uid()));
    json_object_object_add(id, "gid", json_object_new_int(m_payload.driveregisterpointer().log().user().gid()));
    json_object_object_add(jlog, "user", id);
    json_object_object_add(pointer, "log", jlog);
    json_object_object_add(jo, "driveregisterpointer", pointer);
  }
  {
    json_object * pointer = json_object_new_object();
    json_object_object_add(pointer, "address", json_object_new_string(m_payload.agentregisterpointer().address().c_str()));
    json_object * jlog = json_object_new_object();
    json_object_object_add(jlog, "comment", json_object_new_string(m_payload.agentregisterpointer().log().comment().c_str()));
    json_object_object_add(jlog, "host", json_object_new_string(m_payload.agentregisterpointer().log().host().c_str()));
    json_object_object_add(jlog, "time", json_object_new_int64(m_payload.agentregisterpointer().log().time()));
    json_object * id = json_object_new_object();
    json_object_object_add(id, "uid", json_object_new_int(m_payload.agentregisterpointer().log().user().uid()));
    json_object_object_add(id, "gid", json_object_new_int(m_payload.agentregisterpointer().log().user().gid()));
    json_object_object_add(jlog, "user", id);
    json_object_object_add(pointer, "log", jlog);
    json_object_object_add(jo, "agentregisterpointer", pointer);
  }
  {
    json_object * pointer = json_object_new_object();
    json_object_object_add(pointer, "address", json_object_new_string(m_payload.schedulerlockpointer().address().c_str()));
    json_object * jlog = json_object_new_object();