RootEntry.cpp 26.1 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "RootEntry.hpp"
20
21
#include "AgentRegister.hpp"
#include "Agent.hpp"
22
#include "AgentReference.hpp"
23
24
#include "ArchiveQueue.hpp"
#include "RetrieveQueue.hpp"
Eric Cano's avatar
Eric Cano committed
25
#include "DriveRegister.hpp"
26
#include "GenericObject.hpp"
27
#include "SchedulerGlobalLock.hpp"
28
#include <cxxabi.h>
29
#include "ProtocolBuffersAlgorithms.hpp"
30
#include <json-c/json.h>
31

32
33
namespace cta { namespace objectstore {

34
35
// construtor, when the backend store exists.
// Checks the existence and correctness of the root entry
36
RootEntry::RootEntry(Backend & os):
37
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(os, "root") {}
38

39
RootEntry::RootEntry(GenericObject& go): 
40
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(go.objectStore()) {
41
42
43
44
45
46
  // Here we transplant the generic object into the new object
  go.transplantHeader(*this);
  // And interpret the header.
  getPayloadFromHeader();
}

47
48
// Initialiser. This uses the base object's initialiser and sets the defaults 
// of payload.
49
void RootEntry::initialize() {
50
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::initialize();
51
52
53
54
  // There is nothing to do for the payload.
  m_payloadInterpreted = true;
}

55
bool RootEntry::isEmpty() {
Eric Cano's avatar
Eric Cano committed
56
  checkPayloadReadable();
57
58
  if (m_payload.has_driveregisterpointer() &&
      m_payload.driveregisterpointer().address().size())
Eric Cano's avatar
Eric Cano committed
59
60
61
    return false;
  if (m_payload.agentregisterintent().size())
    return false;
62
63
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size())
Eric Cano's avatar
Eric Cano committed
64
    return false;
65
66
  if (m_payload.has_schedulerlockpointer() &&
      m_payload.schedulerlockpointer().address().size())
67
    return false;
68
69
  if (m_payload.archivequeuepointers().size())
    return false;
70
  if (m_payload.retrievequeuepointers().size())
71
    return false;
Eric Cano's avatar
Eric Cano committed
72
73
74
  return true;
}

75
void RootEntry::removeIfEmpty() {
Eric Cano's avatar
Eric Cano committed
76
77
78
79
80
81
82
  checkPayloadWritable();
  if (!isEmpty()) {
    throw NotEmpty("In RootEntry::removeIfEmpty(): root entry not empty");
  }
  remove();
}

83
// =============================================================================
84
// ========== Archive queues manipulations =====================================
85
86
87
88
89
90
91
// =============================================================================


// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(const std::string &tp,
92
    const serializers::ArchiveQueuePointer & tpp) {
93
94
95
96
    return tpp.name() == tp;
  }
}

97
std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, AgentReference& agentRef) {
98
  checkPayloadWritable();
99
  // Check the archive queue does not already exist
100
  try {
101
    return serializers::findElement(m_payload.archivequeuepointers(), tapePool).address();
102
  } catch (serializers::NotFound &) {}
103
  // Insert the archive queue, then its pointer, with agent intent log update
104
  // First generate the intent. We expect the agent to be passed locked.
105
  std::string archiveQueueAddress = agentRef.nextId("archiveQueue");
106
  // TODO Do we expect the agent to be passed locked or not: to be clarified.
107
  Agent agent(agentRef.getAgentAddress(), m_objectStore);
108
109
  ScopedExclusiveLock agl(agent);
  agent.fetch();
110
  agent.addToOwnership(archiveQueueAddress);
111
  agent.commit();
112
  // Then create the tape pool queue object
113
114
115
116
117
118
  ArchiveQueue aq(archiveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  aq.initialize(tapePool);
  aq.setOwner(agent.getAddressIfSet());
  aq.setBackupOwner("root");
  aq.insert();
  ScopedExclusiveLock tpl(aq);
119
  // Now move the tape pool's ownership to the root entry
120
  auto * tpp = m_payload.mutable_archivequeuepointers()->Add();
121
  tpp->set_address(archiveQueueAddress);
122
  tpp->set_name(tapePool);
123
124
125
  // We must commit here to ensure the tape pool object is referenced.
  commit();
  // Now update the tape pool's ownership.
126
127
128
  aq.setOwner(getAddressIfSet());
  aq.setBackupOwner(getAddressIfSet());
  aq.commit();
129
  // ... and clean up the agent
130
  agent.removeFromOwnership(archiveQueueAddress);
131
  agent.commit();
132
  return archiveQueueAddress;
133
134
}

135
void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool) {
136
  checkPayloadWritable();
137
  // find the address of the archive queue object
138
  try {
139
    auto aqp = serializers::findElement(m_payload.archivequeuepointers(), tapePool);
140
    // Open the tape pool object
141
    ArchiveQueue aq (aqp.address(), m_objectStore);
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    ScopedExclusiveLock aql;
    try {
      aql.lock(aq);
      aq.fetch();
    } catch (cta::exception::Exception & ex) {
      // The archive queue seems to not be there. Make sure this is the case:
      if (aq.exists()) {
        // We failed to access the queue, yet it is present. This is an error.
        // Let the exception pass through.
        throw;
      } else {
        // The queue object is already gone. We can skip to removing the 
        // reference from the RootEntry
        goto deleteFromRootEntry;
      }
    }
    // Verify this is the archive queue we're looking for.
    if (aq.getTapePool() != tapePool) {
160
      std::stringstream err;
161
      err << "In RootEntry::removeArchiveQueueAndCommit(): Unexpected tape pool name found in archive queue pointed to for tape pool: "
162
163
          << tapePool << " found: " << aq.getTapePool();
      throw WrongArchiveQueue(err.str());
164
    }
165
    // Check the archive queue is empty
166
    if (!aq.isEmpty()) {
167
      throw ArchivelQueueNotEmpty ("In RootEntry::removeArchiveQueueAndCommit(): trying to "
168
169
          "remove a non-empty tape pool");
    }
170
    // We can delete the queue
171
    aq.remove();
172
  deleteFromRootEntry:
173
    // ... and remove it from our entry
174
    serializers::removeOccurences(m_payload.mutable_archivequeuepointers(), tapePool);
175
176
177
178
    // We commit for safety and symmetry with the add operation
    commit();
  } catch (serializers::NotFound &) {
    // No such tape pool. Nothing to to.
179
    throw NoSuchArchiveQueue("In RootEntry::removeArchiveQueueAndCommit(): trying to remove non-existing archive queue");
180
181
182
  }
}

183
std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool) {
184
185
  checkPayloadReadable();
  try {
186
    auto & tpp = serializers::findElement(m_payload.archivequeuepointers(), tapePool);
187
188
    return tpp.address();
  } catch (serializers::NotFound &) {
189
    throw NoSuchArchiveQueue("In RootEntry::getArchiveQueueAddress: archive queue not allocated");
190
191
192
  }
}

193
auto RootEntry::dumpArchiveQueues() -> std::list<ArchiveQueueDump> {
194
  checkPayloadReadable();
195
196
  std::list<ArchiveQueueDump> ret;
  auto & tpl = m_payload.archivequeuepointers();
197
  for (auto i = tpl.begin(); i!=tpl.end(); i++) {
198
    ret.push_back(ArchiveQueueDump());
199
200
201
202
203
204
    ret.back().address = i->address();
    ret.back().tapePool = i->name();
  }
  return ret;
}

205
206
207
208
// =============================================================================
// ========== Retrieve queues manipulations ====================================
// =============================================================================

209
210
211
212
213
214
215
216
217
// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(const std::string &vid,
    const serializers::RetrieveQueuePointer & tpp) {
    return tpp.vid() == vid;
  }
}

218
std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, AgentReference& agentRef) {
219
220
221
  checkPayloadWritable();
  // Check the retrieve queue does not already exist
  try {
222
    return serializers::findElement(m_payload.retrievequeuepointers(), vid).address();
223
224
225
  } catch (serializers::NotFound &) {}
  // Insert the retrieve queue, then its pointer, with agent intent log update
  // First generate the intent. We expect the agent to be passed locked.
226
  std::string retrieveQueueAddress = agentRef.nextId("retriveQueue");
227
  // TODO Do we expect the agent to be passed locked or not: to be clarified.
228
  Agent agent(agentRef.getAgentAddress(), m_objectStore);
229
230
231
232
233
234
235
236
237
238
239
240
  ScopedExclusiveLock agl(agent);
  agent.fetch();
  agent.addToOwnership(retrieveQueueAddress);
  agent.commit();
  // Then create the tape pool queue object
  RetrieveQueue rq(retrieveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  rq.initialize(vid);
  rq.setOwner(agent.getAddressIfSet());
  rq.setBackupOwner("root");
  rq.insert();
  ScopedExclusiveLock tpl(rq);
  // Now move the tape pool's ownership to the root entry
241
  auto * rqp = m_payload.mutable_retrievequeuepointers()->Add();
242
  rqp->set_address(retrieveQueueAddress);
243
  rqp->set_vid(vid);
244
245
246
247
248
249
250
251
252
253
  // We must commit here to ensure the tape pool object is referenced.
  commit();
  // Now update the tape pool's ownership.
  rq.setOwner(getAddressIfSet());
  rq.setBackupOwner(getAddressIfSet());
  rq.commit();
  // ... and clean up the agent
  agent.removeFromOwnership(retrieveQueueAddress);
  agent.commit();
  return retrieveQueueAddress;
254
255
}

256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid) {
  checkPayloadWritable();
  // find the address of the retrieve queue object
  try {
    auto rqp=serializers::findElement(m_payload.retrievequeuepointers(), vid);
    // Open the retrieve queue object
    RetrieveQueue rq(rqp.address(), m_objectStore);
    ScopedExclusiveLock rql;
    try {
      rql.lock(rq);
      rq.fetch();
    } catch (cta::exception::Exception & ex) {
      // The archive queue seems to not be there. Make sure this is the case:
      if (rq.exists()) {
        // We failed to access the queue, yet it is present. This is an error.
        // Let the exception pass through.
        throw;
      } else {
        // The queue object is already gone. We can skip to removing the 
        // reference from the RootEntry
        goto deleteFromRootEntry;
      }
    }
    // Verify this is the retrieve queue we're looking for.
    if (rq.getVid() != vid) {
      std::stringstream err;
      err << "Unexpected vid found in retrieve queue pointed to for vid: "
          << vid << " found: " << rq.getVid();
      throw WrongArchiveQueue(err.str());
    }
    // Check the retrieve queue is empty
    if (!rq.isEmpty()) {
      throw RetrieveQueueNotEmpty("In RootEntry::removeTapePoolQueueAndCommit: trying to "
          "remove a non-empty tape pool");
    }
    // We can now delete the queue
    rq.remove();
  deleteFromRootEntry:
    // ... and remove it from our entry
    serializers::removeOccurences(m_payload.mutable_retrievequeuepointers(), vid);
    // We commit for safety and symmetry with the add operation
    commit();
  } catch (serializers::NotFound &) {
    // No such tape pool. Nothing to to.
    throw NoSuchRetrieveQueue("In RootEntry::addOrGetRetrieveQueueAndCommit: trying to remove non-existing retrieve queue");
  }
}


305
306
307
308
309
310
311
312
std::string RootEntry::getRetrieveQueue(const std::string& vid) {
  throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
}

void RootEntry::removeArchiveQueueIfAddressMatchesAndCommit(const std::string& tapePool, const std::string& archiveQueueAddress) {
  throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
}

313
314
315
316
317
318
319
320
321
322
323
324
auto RootEntry::dumpRetrieveQueues() -> std::list<RetrieveQueueDump> {
  checkPayloadReadable();
  std::list<RetrieveQueueDump> ret;
  auto & tpl = m_payload.retrievequeuepointers();
  for (auto i = tpl.begin(); i!=tpl.end(); i++) {
    ret.push_back(RetrieveQueueDump());
    ret.back().address = i->address();
    ret.back().vid = i->vid();
  }
  return ret;
}

Eric Cano's avatar
Eric Cano committed
325
326
327
328
// =============================================================================
// ================ Drive register manipulation ================================
// =============================================================================

329
std::string RootEntry::addOrGetDriveRegisterPointerAndCommit(
330
  AgentReference& agentRef, const EntryLogSerDeser & log) {
Eric Cano's avatar
Eric Cano committed
331
332
333
334
335
336
337
  checkPayloadWritable();
  // Check if the drive register exists
  try {
    return getDriveRegisterAddress();
  } catch (NotAllocated &) {
    // decide on the object's name and add to agent's intent. We expect the
    // agent to be passed locked.
338
339
    std::string drAddress (agentRef.nextId("driveRegister"));
    Agent agent(agentRef.getAgentAddress(), m_objectStore);
Eric Cano's avatar
Eric Cano committed
340
341
342
343
    ScopedExclusiveLock agl(agent);
    agent.fetch();
    agent.addToOwnership(drAddress);
    agent.commit();
344
    // Then create the drive register object
Eric Cano's avatar
Eric Cano committed
345
346
    DriveRegister dr(drAddress, m_objectStore);
    dr.initialize();
347
348
    dr.setOwner(agent.getAddressIfSet());
    dr.setBackupOwner(getAddressIfSet());
Eric Cano's avatar
Eric Cano committed
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
    dr.insert();
    // Take a lock on drive registry
    ScopedExclusiveLock drLock(dr);
    // Move drive registry ownership to the root entry
    auto * mdrp = m_payload.mutable_driveregisterpointer();
    mdrp->set_address(drAddress);
    log.serialize(*mdrp->mutable_log());
    commit();
    // Record completion in drive registry
    dr.setOwner(getAddressIfSet());
    dr.setBackupOwner(getAddressIfSet());
    dr.commit();
    //... and clean up the agent
    agent.removeFromOwnership(drAddress);
    agent.commit();
    return drAddress;
  }
}

368
void RootEntry::removeDriveRegisterAndCommit() {
Eric Cano's avatar
Eric Cano committed
369
370
  checkPayloadWritable();
  // Get the address of the drive register (nothing to do if there is none)
371
372
  if (!m_payload.has_driveregisterpointer() || 
      !m_payload.driveregisterpointer().address().size())
Eric Cano's avatar
Eric Cano committed
373
374
    return;
  std::string drAddr = m_payload.driveregisterpointer().address();
375
  DriveRegister dr(drAddr, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
Eric Cano's avatar
Eric Cano committed
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
  ScopedExclusiveLock drl(dr);
  dr.fetch();
  // Check the drive register is empty
  if (!dr.isEmpty()) {
    throw DriveRegisterNotEmpty("In RootEntry::removeDriveRegisterAndCommit: "
      "trying to remove a non-empty drive register");
  }
  // we can delete the drive register
  dr.remove();
  // And update the root entry
  m_payload.mutable_driveregisterpointer()->set_address("");
  // We commit for safety and symmetry with the add operation
  commit();
}

391
std::string RootEntry::getDriveRegisterAddress() {
392
  checkPayloadReadable();
393
394
  if (m_payload.has_driveregisterpointer() && 
      m_payload.driveregisterpointer().address().size()) {
Eric Cano's avatar
Eric Cano committed
395
396
397
398
399
400
    return m_payload.driveregisterpointer().address();
  }
  throw NotAllocated("In RootEntry::getDriveRegisterAddress: drive register not allocated");
}


401
402
403
404
// =============================================================================
// ================ Agent register manipulation ================================
// =============================================================================
// Get the name of the agent register (or exception if not available)
405
std::string RootEntry::getAgentRegisterAddress() {
406
  checkPayloadReadable();
407
  // If the registry is defined, return it, job done.
408
409
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size())
410
    return m_payload.agentregisterpointer().address();
411
  throw NotAllocated("In RootEntry::getAgentRegister: agentRegister not yet allocated");
412
413
414
}

// Get the name of a (possibly freshly created) agent register
415
std::string RootEntry::addOrGetAgentRegisterPointerAndCommit(AgentReference& agentRef,
416
  const EntryLogSerDeser & log) {
417
418
  // Check if the agent register exists
  try {
419
420
    return getAgentRegisterAddress();
  } catch (NotAllocated &) {
421
422
423
    // If we get here, the agent register is not created yet, so we have to do it:
    // lock the entry again, for writing. We take the lock ourselves if needed
    // This will make an autonomous transaction
424
    checkPayloadWritable();
425
    fetch();
426
427
    if (m_payload.has_agentregisterpointer() &&
        m_payload.agentregisterpointer().address().size()) {
428
429
430
      return m_payload.agentregisterpointer().address();
    }
    // decide on the object's name
431
    std::string arAddress (agentRef.nextId("agentRegister"));
432
    // Record the agent registry in our own intent
433
    addIntendedAgentRegistry(arAddress);
434
435
    commit();
    // Create the agent registry
436
    AgentRegister ar(arAddress, m_objectStore);
437
438
439
440
441
442
443
444
445
    ar.initialize();
    // There is no garbage collection for an agent registry: if it is not
    // plugged to the root entry, it does not exist.
    ar.setOwner("");
    ar.setBackupOwner("");
    ar.insert();
    // Take a lock on agent registry
    ScopedExclusiveLock arLock(ar);
    // Move agent registry from intent to official
446
447
448
449
    auto * marp = m_payload.mutable_agentregisterpointer();
    marp->set_address(arAddress);
    log.serialize(*marp->mutable_log());
    m_payload.set_agentregisterintent("");
450
451
    commit();
    // Record completion in agent registry
452
453
    ar.setOwner(getAddressIfSet());
    ar.setBackupOwner(getAddressIfSet());
454
455
456
    ar.commit();
    // And we are done. Release locks
    arLock.release();
457
458
459
460
    return arAddress;
  }
}

461
void RootEntry::removeAgentRegisterAndCommit() {
462
463
464
465
466
  checkPayloadWritable();
  // Check that we do have an agent register set. Cleanup a potential intent as
  // well
  if (m_payload.agentregisterintent().size()) {
    AgentRegister iar(m_payload.agentregisterintent(),
467
      ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
468
469
470
471
472
473
474
475
476
    ScopedExclusiveLock iarl(iar);
    // An agent register only referenced in the intent should not be used
    // and hence empty. We'll see that.
    iar.fetch();
    if (!iar.isEmpty()) {
      throw AgentRegisterNotEmpty("In RootEntry::removeAgentRegister: found "
        "a non-empty intended agent register. Internal error.");
    }
    iar.remove();
Eric Cano's avatar
Eric Cano committed
477
478
    m_payload.set_agentregisterintent("");
    commit();
479
  }
480
481
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size()) {
482
    AgentRegister ar(m_payload.agentregisterpointer().address(),
483
      ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
484
    ScopedExclusiveLock arl(ar);
Eric Cano's avatar
Eric Cano committed
485
    ar.fetch();
486
    if (!ar.isEmpty()) {
Eric Cano's avatar
Eric Cano committed
487
488
      throw AgentRegisterNotEmpty("In RootEntry::removeAgentRegister: the agent "
        "register is not empty. Cannot remove.");
489
    }
Eric Cano's avatar
Eric Cano committed
490
491
492
    ar.remove();
    m_payload.mutable_agentregisterpointer()->set_address("");
    commit();
493
494
495
  }
}

496
void RootEntry::addIntendedAgentRegistry(const std::string& address) {
497
498
499
500
501
  checkPayloadWritable();
  // We are supposed to have only one intended agent registry at a time.
  // If we got the lock and there is one entry, this means the previous
  // attempt to create one did not succeed.
  // When getting here, having a set pointer to the registry is an error.
502
503
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size()) {
504
    throw exception::Exception("In RootEntry::addIntendedAgentRegistry:"
505
506
507
508
509
510
511
512
        " pointer to registry already set");
  }
  if (m_payload.agentregisterintent().size()) {
    // The intended object might not have made it to the official pointer.
    // If it did, we just clean up the intent.
    // If it did not, we clean up the object if present, clean up the intent
    // and replace it with the new one.
    // We do not recycle the object, as the state is doubtful.
513
    if (ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore.exists(
514
515
      m_payload.agentregisterintent())) {
      AgentRegister iar(m_payload.agentregisterintent(),
516
        ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
517
518
519
520
521
522
      iar.fetch();
      if (!iar.isEmpty()) {
        throw AgentRegisterNotEmpty("In RootEntry::addIntendedAgentRegistry, "
          "found a non-empty intended agent register. Internal Error.");
      }
      iar.remove();
523
524
525
526
527
    }
  }
  m_payload.set_agentregisterintent(address);
}

528
529
530
531
// =============================================================================
// ================ Scheduler global lock manipulation =========================
// =============================================================================

532
std::string RootEntry::getSchedulerGlobalLock() {
533
534
  checkPayloadReadable();
  // If the scheduler lock is defined, return it, job done.
535
536
  if (m_payload.has_schedulerlockpointer() &&
      m_payload.schedulerlockpointer().address().size())
537
538
539
540
541
    return m_payload.schedulerlockpointer().address();
  throw NotAllocated("In RootEntry::getAgentRegister: scheduler global lock not yet allocated");
}

// Get the name of a (possibly freshly created) scheduler global lock
542
std::string RootEntry::addOrGetSchedulerGlobalLockAndCommit(AgentReference& agentRef,
543
  const EntryLogSerDeser & log) {
544
545
546
547
548
549
550
  checkPayloadWritable();
  // Check if the drive register exists
  try {
    return getSchedulerGlobalLock();
  } catch (NotAllocated &) {
    // decide on the object's name and add to agent's intent. We expect the
    // agent to be passed locked.
551
552
    std::string sglAddress (agentRef.nextId("schedulerGlobalLock"));
    Agent agent(agentRef.getAgentAddress(), m_objectStore);
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
    ScopedExclusiveLock agl(agent);
    agent.fetch();
    agent.addToOwnership(sglAddress);
    agent.commit();
    // Then create the drive register object
    SchedulerGlobalLock sgl(sglAddress, m_objectStore);
    sgl.initialize();
    sgl.setOwner(agent.getAddressIfSet());
    sgl.setBackupOwner(getAddressIfSet());
    sgl.insert();
    // Take a lock on scheduler global lock
    ScopedExclusiveLock sglLock(sgl);
    // Move drive registry ownership to the root entry
    auto * msgl = m_payload.mutable_schedulerlockpointer();
    msgl->set_address(sglAddress);
    log.serialize(*msgl->mutable_log());
    commit();
    // Record completion in scheduler global lock
    sgl.setOwner(getAddressIfSet());
    sgl.setBackupOwner(getAddressIfSet());
    sgl.commit();
    //... and clean up the agent
    agent.removeFromOwnership(sglAddress);
    agent.commit();
    return sglAddress;
  }
}

581
void RootEntry::removeSchedulerGlobalLockAndCommit() {
582
583
  checkPayloadWritable();
  // Get the address of the scheduler lock (nothing to do if there is none)
584
585
  if (!m_payload.has_schedulerlockpointer() ||
      !m_payload.schedulerlockpointer().address().size())
586
587
    return;
  std::string sglAddress = m_payload.schedulerlockpointer().address();
588
  SchedulerGlobalLock sgl(sglAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
  ScopedExclusiveLock sgll(sgl);
  sgl.fetch();
  // Check the drive register is empty
  if (!sgl.isEmpty()) {
    throw DriveRegisterNotEmpty("In RootEntry::removeSchedulerGlobalLockAndCommit: "
      "trying to remove a non-empty scheduler global lock");
  }
  // we can delete the drive register
  sgl.remove();
  // And update the root entry
  m_payload.mutable_schedulerlockpointer()->set_address("");
  // We commit for safety and symmetry with the add operation
  commit();
}


605
// Dump the root entry
606
std::string RootEntry::dump () {  
607
  checkPayloadReadable();
608
  std::stringstream ret;
609
610
611
612
613
614
615
616
617
618
619
620
  ret << "RootEntry" << std::endl;
  struct json_object * jo = json_object_new_object();
  
  json_object_object_add(jo, "agentregisterintent", json_object_new_string(m_payload.agentregisterintent().c_str()));
  
  {
    json_object * pointer = json_object_new_object();
    json_object_object_add(pointer, "address", json_object_new_string(m_payload.driveregisterpointer().address().c_str()));
    json_object * jlog = json_object_new_object();
    json_object_object_add(jlog, "host", json_object_new_string(m_payload.driveregisterpointer().log().host().c_str()));
    json_object_object_add(jlog, "time", json_object_new_int64(m_payload.driveregisterpointer().log().time()));
    json_object * id = json_object_new_object();
621
    json_object_object_add(id, "name", json_object_new_string(m_payload.driveregisterpointer().log().username().c_str()));
622
623
624
625
626
627
628
629
630
631
632
    json_object_object_add(jlog, "user", id);
    json_object_object_add(pointer, "log", jlog);
    json_object_object_add(jo, "driveregisterpointer", pointer);
  }
  {
    json_object * pointer = json_object_new_object();
    json_object_object_add(pointer, "address", json_object_new_string(m_payload.agentregisterpointer().address().c_str()));
    json_object * jlog = json_object_new_object();
    json_object_object_add(jlog, "host", json_object_new_string(m_payload.agentregisterpointer().log().host().c_str()));
    json_object_object_add(jlog, "time", json_object_new_int64(m_payload.agentregisterpointer().log().time()));
    json_object * id = json_object_new_object();
633
    json_object_object_add(id, "name", json_object_new_string(m_payload.driveregisterpointer().log().username().c_str()));
634
635
636
637
638
639
640
641
642
643
644
    json_object_object_add(jlog, "user", id);
    json_object_object_add(pointer, "log", jlog);
    json_object_object_add(jo, "agentregisterpointer", pointer);
  }
  {
    json_object * pointer = json_object_new_object();
    json_object_object_add(pointer, "address", json_object_new_string(m_payload.schedulerlockpointer().address().c_str()));
    json_object * jlog = json_object_new_object();
    json_object_object_add(jlog, "host", json_object_new_string(m_payload.schedulerlockpointer().log().host().c_str()));
    json_object_object_add(jlog, "time", json_object_new_int64(m_payload.schedulerlockpointer().log().time()));
    json_object * id = json_object_new_object();
645
    json_object_object_add(id, "name", json_object_new_string(m_payload.driveregisterpointer().log().username().c_str()));
646
647
648
649
650
651
    json_object_object_add(jlog, "user", id);
    json_object_object_add(pointer, "log", jlog);
    json_object_object_add(jo, "schedulerlockpointer", pointer);
  }
  {
    json_object * array = json_object_new_array();
652
    for (auto & i :m_payload.archivequeuepointers()) {
653
654
      json_object * jot = json_object_new_object();

655
656
      json_object_object_add(jot, "name", json_object_new_string(i.name().c_str())); 
      json_object_object_add(jot, "address", json_object_new_string(i.address().c_str()));  
657
658
659

      json_object_array_add(array, jot);
    }
660
    json_object_object_add(jo, "archivequeuepointers", array);
661
662
663
664
  }
  
  ret << json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY) << std::endl;
  json_object_put(jo);
665
666
  return ret.str();
}
667
668

}} // namespace cta::objectstore