GarbageCollectorTest.cpp 24 KB
Newer Older
1

2
/*
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

20
21
#include <gtest/gtest.h>
#include "BackendVFS.hpp"
22
#include "common/exception/Exception.hpp"
23
#include "common/dataStructures/ArchiveFile.hpp"
24
#include "common/log/DummyLogger.hpp"
25
26
27
28
#include "tests/TestsCompileTimeSwitches.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
#endif
29
#include "GarbageCollector.hpp"
30
#include "RootEntry.hpp"
31
#include "Agent.hpp"
32
#include "AgentReference.hpp"
33
#include "AgentRegister.hpp"
34
#include "DriveRegister.hpp"
35
#include "ArchiveRequest.hpp"
36
#include "RetrieveRequest.hpp"
37
#include "ArchiveQueue.hpp"
38
#include "RetrieveQueue.hpp"
39
#include "EntryLogSerDeser.hpp"
40
#include "catalogue/DummyCatalogue.hpp"
41

42
43
namespace unitTests {

44
TEST(ObjectStore, GarbageCollectorBasicFuctionnality) {
45
46
  // We will need a log object 
  cta::log::DummyLogger dl("unitTest");
47
  cta::catalogue::DummyCatalogue catalogue(dl);
48
  cta::log::LogContext lc(dl);
49
50
  // Here we check for the ability to detect dead (but empty agents)
  // and clean them up.
51
  cta::objectstore::BackendVFS be;
52
  cta::objectstore::AgentReference agentRef("unitTestGarbageCollector", dl);
53
  cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
54
55
56
57
58
  // Create the root entry
  cta::objectstore::RootEntry re(be);
  re.initialize();
  re.insert();
  // Create the agent register
59
    cta::objectstore::EntryLogSerDeser el("user0",
60
      "unittesthost", time(NULL));
61
  cta::objectstore::ScopedExclusiveLock rel(re);
62
  re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
63
  rel.release();
64
  // Create 2 agents, A and B and register them
65
66
  // The agents are set with a timeout of 0, so they will be delclared
  // dead immediately.
67
  cta::objectstore::AgentReference agrA("unitTestAgentA", dl), agrB("unitTestAgentB", dl);
68
  cta::objectstore::Agent agA(agrA.getAgentAddress(), be), agB(agrB.getAgentAddress(), be);
69
  agA.initialize();
70
  agA.setTimeout_us(0);
71
  agA.insertAndRegisterSelf(lc);
72
  agB.initialize();
73
  agB.setTimeout_us(0);
74
  agB.insertAndRegisterSelf(lc);
75
  // Create the garbage colletor and run it twice.
76
  cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
77
  cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
78
  gcAgent.initialize();
79
  gcAgent.setTimeout_us(0);
80
  gcAgent.insertAndRegisterSelf(lc);
81
  {
82
    cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
83
84
    gc.runOnePass(lc);
    gc.runOnePass(lc);
85
86
87
  }
  // Unregister gc's agent
  cta::objectstore::ScopedExclusiveLock gcal(gcAgent);
88
  gcAgent.fetch();
89
  gcAgent.removeAndUnregisterSelf(lc);
90
91
92
  // We should not be able to remove the agent register (as it should be empty)
  rel.lock(re);
  re.fetch();
93
94
  ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
  ASSERT_NO_THROW(re.removeIfEmpty(lc));
95
96
}

97
TEST(ObjectStore, GarbageCollectorRegister) {
98
99
100
  // We will need a log object 
  cta::log::DummyLogger dl("unitTest");
  cta::log::LogContext lc(dl);
101
  cta::catalogue::DummyCatalogue catalogue(dl);
102
103
  // Here we check that can successfully call agentRegister's garbage collector
  cta::objectstore::BackendVFS be;
104
  cta::objectstore::AgentReference agentRef("unitTestGarbageCollector", dl);
105
  cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
106
107
108
109
110
  // Create the root entry
  cta::objectstore::RootEntry re(be);
  re.initialize();
  re.insert();
  // Create the agent register
111
    cta::objectstore::EntryLogSerDeser el("user0",
112
      "unittesthost", time(NULL));
113
  cta::objectstore::ScopedExclusiveLock rel(re);
114
  re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
115
116
  rel.release();
  // Create an agent and add and agent register to it as an owned object
117
  cta::objectstore::AgentReference agrA("unitTestAgentA", dl);
118
  cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
119
120
  agA.initialize();
  agA.setTimeout_us(0);
121
  agA.insertAndRegisterSelf(lc);
122
123
124
125
  // Create a new agent register, owned by agA (by hand as it is not an usual
  // situation)
  std::string arName;
  {
126
    arName = agrA.nextId("AgentRegister");
127
128
    cta::objectstore::AgentRegister ar(arName, be);
    ar.initialize();
129
130
    ar.setOwner(agrA.getAgentAddress());
    agrA.addToOwnership(arName, be);
131
132
133
    ar.insert();
  }
  // Create the garbage colletor and run it twice.
134
  cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
135
  cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
136
137
  gcAgent.initialize();
  gcAgent.setTimeout_us(0);
138
  gcAgent.insertAndRegisterSelf(lc);
139
  {
140
    cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
141
142
    gc.runOnePass(lc);
    gc.runOnePass(lc);
143
  }
144
  ASSERT_FALSE(be.exists(arName));
145
146
  // Unregister gc's agent
  cta::objectstore::ScopedExclusiveLock gcal(gcAgent);
147
  gcAgent.fetch();
148
  gcAgent.removeAndUnregisterSelf(lc);
149
150
151
  // We should not be able to remove the agent register (as it should be empty)
  rel.lock(re);
  re.fetch();
152
153
  ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
  ASSERT_NO_THROW(re.removeIfEmpty(lc));
154
155
}

156
TEST(ObjectStore, GarbageCollectorArchiveQueue) {
157
158
159
  // We will need a log object 
  cta::log::DummyLogger dl("unitTest");
  cta::log::LogContext lc(dl);
160
161
  // We need a dummy catalogue
  cta::catalogue::DummyCatalogue catalogue(dl);
162
163
  // Here we check that can successfully call agentRegister's garbage collector
  cta::objectstore::BackendVFS be;
164
  cta::objectstore::AgentReference agentRef("unitTestGarbageCollector", dl);
165
  cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
166
167
168
169
170
  // Create the root entry
  cta::objectstore::RootEntry re(be);
  re.initialize();
  re.insert();
  // Create the agent register
171
    cta::objectstore::EntryLogSerDeser el("user0",
172
      "unittesthost", time(NULL));
173
  cta::objectstore::ScopedExclusiveLock rel(re);
174
  re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
175
176
  rel.release();
  // Create an agent and add and agent register to it as an owned object
177
  cta::objectstore::AgentReference agrA("unitTestAgentA", dl);
178
  cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
179
180
  agA.initialize();
  agA.setTimeout_us(0);
181
  agA.insertAndRegisterSelf(lc);
182
183
184
185
  // Create a new agent register, owned by agA (by hand as it is not an usual
  // situation)
  std::string tpName;
  {
186
    tpName = agrA.nextId("ArchiveQueue");
187
188
189
    cta::objectstore::ArchiveQueue aq(tpName, be);
    aq.initialize("SomeTP");
    aq.setOwner(agA.getAddressIfSet());
190
    agrA.addToOwnership(tpName, be);
191
    aq.insert();
192
193
  }
  // Create the garbage colletor and run it twice.
194
  cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
195
  cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
196
197
  gcAgent.initialize();
  gcAgent.setTimeout_us(0);
198
  gcAgent.insertAndRegisterSelf(lc);
199
  {
200
    cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
201
202
    gc.runOnePass(lc);
    gc.runOnePass(lc);
203
204
205
206
  }
  ASSERT_FALSE(be.exists(tpName));
  // Unregister gc's agent
  cta::objectstore::ScopedExclusiveLock gcal(gcAgent);
207
  gcAgent.fetch();
208
  gcAgent.removeAndUnregisterSelf(lc);
209
210
211
  // We should not be able to remove the agent register (as it should be empty)
  rel.lock(re);
  re.fetch();
212
213
  ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
  ASSERT_NO_THROW(re.removeIfEmpty(lc));
214
215
}

216
TEST(ObjectStore, GarbageCollectorDriveRegister) {
217
218
219
  // We will need a log object 
  cta::log::DummyLogger dl("unitTest");
  cta::log::LogContext lc(dl);
220
221
  // We need a dummy catalogue
  cta::catalogue::DummyCatalogue catalogue(dl);
222
223
  // Here we check that can successfully call agentRegister's garbage collector
  cta::objectstore::BackendVFS be;
224
  cta::objectstore::AgentReference agentRef("unitTestGarbageCollector", dl);
225
  cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
226
227
228
229
230
  // Create the root entry
  cta::objectstore::RootEntry re(be);
  re.initialize();
  re.insert();
  // Create the agent register
231
    cta::objectstore::EntryLogSerDeser el("user0",
232
      "unittesthost", time(NULL));
233
  cta::objectstore::ScopedExclusiveLock rel(re);
234
  re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
235
  rel.release();
236
  // Create an agent and add the drive register to it as an owned object
237
  cta::objectstore::AgentReference agrA("unitTestAgentA", dl);
238
  cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
239
240
  agA.initialize();
  agA.setTimeout_us(0);
241
  agA.insertAndRegisterSelf(lc);
242
  // Create a new drive register, owned by agA (by hand as it is not an usual
243
244
245
  // situation)
  std::string tpName;
  {
246
    tpName = agrA.nextId("ArchiveQueue");
247
248
249
    cta::objectstore::DriveRegister dr(tpName, be);
    dr.initialize();
    dr.setOwner(agA.getAddressIfSet());
250
    agrA.addToOwnership(tpName, be);
251
252
253
    dr.insert();
  }
  // Create the garbage colletor and run it twice.
254
  cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
255
  cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
256
257
  gcAgent.initialize();
  gcAgent.setTimeout_us(0);
258
  gcAgent.insertAndRegisterSelf(lc);
259
  {
260
    cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
261
262
    gc.runOnePass(lc);
    gc.runOnePass(lc);
263
264
265
266
  }
  ASSERT_FALSE(be.exists(tpName));
  // Unregister gc's agent
  cta::objectstore::ScopedExclusiveLock gcal(gcAgent);
267
  gcAgent.fetch();
268
  gcAgent.removeAndUnregisterSelf(lc);
269
270
271
  // We should not be able to remove the agent register (as it should be empty)
  rel.lock(re);
  re.fetch();
272
273
  ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
  ASSERT_NO_THROW(re.removeIfEmpty(lc));
274
275
}

276
TEST(ObjectStore, GarbageCollectorArchiveRequest) {
277
  // We will need a log object 
278
279
280
#ifdef STDOUT_LOGGING
  cta::log::StdoutLogger dl("unitTest");
#else
281
  cta::log::DummyLogger dl("unitTest");
282
#endif
283
  cta::log::LogContext lc(dl);
284
285
  // We need a dummy catalogue
  cta::catalogue::DummyCatalogue catalogue(dl);
286
  // Here we check that can successfully call ArchiveRequests's garbage collector
287
288
289
290
291
292
  cta::objectstore::BackendVFS be;
  // Create the root entry
  cta::objectstore::RootEntry re(be);
  re.initialize();
  re.insert();
  // Create the agent register
293
    cta::objectstore::EntryLogSerDeser el("user0",
294
      "unittesthost", time(NULL));
295
  cta::objectstore::ScopedExclusiveLock rel(re);
296
  // Create the agent for objects creation
297
  cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
298
  // Finish root creation.
299
  re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
300
  rel.release();
301
302
303
304
  // continue agent creation.
  cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
  agent.initialize();
  agent.setTimeout_us(0);
305
  agent.insertAndRegisterSelf(lc);
306
  // Create an agent to garbage collected
307
  cta::objectstore::AgentReference agrA("unitTestAgentA", dl);
308
  cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
309
310
  agA.initialize();
  agA.setTimeout_us(0);
311
  agA.insertAndRegisterSelf(lc);
312
  // Several use cases are present for the ArchiveRequests:
313
314
315
316
317
  // - just referenced in agent ownership list, but not yet created.
  // - just created but not linked to any tape pool
  // - partially linked to tape pools
  // - linked to all tape pools
  // - In the 2 latter cases, the job could have been picked up for processing
318
319
  //
  // Create 2 archive queues
320
321
322
  std::string tpAddr[2];
  for (int i=0; i<2; i++)
  {
323
324
325
326
327
    cta::objectstore::RootEntry re(be);
    cta::objectstore::ScopedExclusiveLock rel(re);
    re.fetch();
    std::stringstream tapePoolName;
    tapePoolName << "TapePool" << i;
328
    tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, lc);
329
    cta::objectstore::ArchiveQueue aq(tpAddr[i], be);
330
331
  }
  // Create the various ATFR's, stopping one step further each time.
332
  unsigned int pass=0;
333
334
335
  while (true)
  {
    // -just referenced
336
    std::string atfrAddr = agrA.nextId("ArchiveRequest");
337
    agrA.addToOwnership(atfrAddr, be);
338
    if (pass < 1) { pass++; continue; }
339
340
    // - created, but not linked to tape pools. Those jobs will be queued by the garbage
    // collector.
341
342
    cta::objectstore::ArchiveRequest ar(atfrAddr, be);
    ar.initialize();
343
344
345
346
347
348
349
350
351
352
353
354
    cta::common::dataStructures::ArchiveFile aFile;
    aFile.archiveFileID = 123456789L;
    aFile.diskFileId = "eos://diskFile";
    aFile.checksumType = "";
    aFile.checksumValue = "";
    aFile.creationTime = 0;
    aFile.reconciliationTime = 0;
    aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo();
    aFile.diskInstance = "eoseos";
    aFile.fileSize = 667;
    aFile.storageClass = "sc";
    ar.setArchiveFile(aFile);
355
356
    ar.addJob(1, "TapePool0", tpAddr[0], 1, 1);
    ar.addJob(2, "TapePool1", tpAddr[1], 1, 1);    
357
    ar.setOwner(agA.getAddressIfSet());
358
359
    cta::common::dataStructures::MountPolicy mp;
    ar.setMountPolicy(mp);
360
    ar.setArchiveReportURL("");
361
362
    ar.setRequester(cta::common::dataStructures::UserIdentity("user0", "group0"));
    ar.setSrcURL("root://eoseos/myFile");
363
    ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr)));
364
365
    ar.insert();
    cta::objectstore::ScopedExclusiveLock atfrl(ar);
366
    if (pass < 2) { pass++; continue; }
367
    // - Change the jobs statuses from PendingNSCreation to LinkingToArchiveQueue.
368
369
370
    // They will be automatically connected to the tape pool by the garbage 
    // collector from that moment on.
    {
371
372
      ar.setAllJobsLinkingToArchiveQueue();
      ar.commit();
373
374
    }
    if (pass < 3) { pass++; continue; }
375
376
    // - Referenced in the first tape pool
    {
377
378
379
380
      cta::objectstore::ArchiveQueue aq(tpAddr[0], be);
      cta::objectstore::ScopedExclusiveLock tpl(aq);
      aq.fetch();
      cta::objectstore::ArchiveRequest::JobDump jd;
381
382
      jd.copyNb = 1;
      jd.tapePool = "TapePool0";
383
      jd.owner = tpAddr[0];
384
385
386
387
      cta::common::dataStructures::MountPolicy policy;
      policy.archiveMinRequestAge = 0;
      policy.archivePriority = 1;
      policy.maxDrivesAllowed = 1;
388
389
      std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta;
      jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000U+pass, policy, time(NULL)});
390
      aq.addJobsAndCommit(jta, agentRef, lc);
391
    }
392
    if (pass < 4) { pass++; continue; }
393
394
395
    // TODO: partially migrated or selected
    // - Referenced in the second tape pool
    {
396
397
398
399
      cta::objectstore::ArchiveQueue aq(tpAddr[1], be);
      cta::objectstore::ScopedExclusiveLock tpl(aq);
      aq.fetch();
      cta::objectstore::ArchiveRequest::JobDump jd;
400
401
      jd.copyNb = 2;
      jd.tapePool = "TapePool1";
402
      jd.owner = tpAddr[1];
403
404
405
406
      cta::common::dataStructures::MountPolicy policy;
      policy.archiveMinRequestAge = 0;
      policy.archivePriority = 1;
      policy.maxDrivesAllowed = 1;
407
408
      std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta;
      jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)});
409
      aq.addJobsAndCommit(jta, agentRef, lc);
410
    }
411
    if (pass < 5) { pass++; continue; }
412
413
    // - Still marked a not owned but referenced in the agent
    {
414
415
      ar.setOwner("");
      ar.commit();
416
417
418
419
    }
    break;
  }
  // Create the garbage collector and run it twice.
420
  cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
421
  cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
422
423
  gcAgent.initialize();
  gcAgent.setTimeout_us(0);
424
  gcAgent.insertAndRegisterSelf(lc);
425
  {
426
    cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
427
428
    gc.runOnePass(lc);
    gc.runOnePass(lc);
429
430
431
  }
  // All 4 requests should be linked in both tape pools
  {
432
433
434
435
436
437
438
439
    cta::objectstore::ArchiveQueue aq0(tpAddr[0], be);
    cta::objectstore::ScopedExclusiveLock tp0lock(aq0);
    aq0.fetch();
    auto d0=aq0.dumpJobs();
    cta::objectstore::ArchiveQueue aq1(tpAddr[1], be);
    cta::objectstore::ScopedExclusiveLock tp1lock(aq1);
    aq1.fetch();
    auto d1=aq1.dumpJobs();
440
441
    // We expect all jobs with sizes 1002-1005 inclusive to be connected to
    // their respective tape pools.
442
443
    ASSERT_EQ(5, aq0.getJobsSummary().jobs);
    ASSERT_EQ(5, aq1.getJobsSummary().jobs);
444
445
446
  }
  // Unregister gc's agent
  cta::objectstore::ScopedExclusiveLock gcal(gcAgent);
447
  gcAgent.fetch();
448
  gcAgent.removeAndUnregisterSelf(lc);
449
450
451
  // We should not be able to remove the agent register (as it should be empty)
  rel.lock(re);
  re.fetch();
452
453
454
455
456
457
458
  // Remove jobs from archive queues
  std::list<std::string> tapePools = { "TapePool0", "TapePool1" };
  for (auto & tp: tapePools) {
    // Empty queue
    cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp), be);
    cta::objectstore::ScopedExclusiveLock aql(aq);
    aq.fetch();
459
    std::list<std::string> ajtr;
460
    for (auto &j: aq.dumpJobs()) {
461
      ajtr.push_back(j.address);
462
    }
463
    aq.removeJobsAndCommit(ajtr);
464
465
    aql.release();
    // Remove queues from root
466
    re.removeArchiveQueueAndCommit(tp, lc);
467
468
  }

469
470
  ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
  ASSERT_NO_THROW(re.removeIfEmpty(lc));
471
472
  // TODO: this unit test still leaks tape pools and requests
}
473

474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
  // We will need a log object
#ifdef STDOUT_LOGGING
  cta::log::StdoutLogger dl("unitTest");
#else
  cta::log::DummyLogger dl("unitTest");
#endif
  cta::log::LogContext lc(dl);
  // We need a dummy catalogue
  cta::catalogue::DummyCatalogue catalogue(dl);
  // Here we check that can successfully call RetrieveRequests's garbage collector
  cta::objectstore::BackendVFS be;
  // Create the root entry
  cta::objectstore::RootEntry re(be);
  re.initialize();
  re.insert();
  // Create the agent register
  cta::objectstore::EntryLogSerDeser el("user0",
      "unittesthost", time(NULL));
  cta::objectstore::ScopedExclusiveLock rel(re);
  // Create the agent for objects creation
495
  cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
496
  // Finish root creation.
497
  re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
498
499
500
501
502
  rel.release();
  // continue agent creation.
  cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
  agent.initialize();
  agent.setTimeout_us(0);
503
  agent.insertAndRegisterSelf(lc);
504
  // Create an agent to garbage be collected
505
  cta::objectstore::AgentReference agrA("unitTestAgentA", dl);
506
507
508
  cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
  agA.initialize();
  agA.setTimeout_us(0);
509
  agA.insertAndRegisterSelf(lc);
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
  // Several use cases are present for the RetrieveRequests:
  // - just referenced in agent ownership list, but not yet created.
  // - just created but not linked to any tape
  // - partially linked to tape
  // - When requeueing the request, the tape could be disabled, in which case
  //   it will be deleted.
  //
  // Create 2 retrieve queues
  std::string tAddr[2];
  for (int i=0; i<2; i++)
  {
    cta::objectstore::RootEntry re(be);
    cta::objectstore::ScopedExclusiveLock rel(re);
    re.fetch();
    std::stringstream vid;
    vid << "Tape" << i;
    tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef);
    cta::objectstore::RetrieveQueue rq(tAddr[i], be);
  }
  // Create the various ATFR's, stopping one step further each time.
  int pass=0;
  while (true)
  {
    // -just referenced
    std::string atfrAddr = agrA.nextId("RetrieveRequest");
    agrA.addToOwnership(atfrAddr, be);
    if (pass < 1) { pass++; continue; }
    // - created, but not linked to tape pools. Those jobs will be queued by the garbage
    // collector.
    cta::objectstore::RetrieveRequest rr(atfrAddr, be);
    rr.initialize();
    cta::common::dataStructures::RetrieveFileQueueCriteria rqc;
    rqc.archiveFile.archiveFileID = 123456789L;
    rqc.archiveFile.diskFileId = "eos://diskFile";
    rqc.archiveFile.checksumType = "";
    rqc.archiveFile.checksumValue = "";
    rqc.archiveFile.creationTime = 0;
    rqc.archiveFile.reconciliationTime = 0;
    rqc.archiveFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo();
    rqc.archiveFile.diskInstance = "eoseos";
550
    rqc.archiveFile.fileSize = 1000 + pass;
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
    rqc.archiveFile.storageClass = "sc";
    rqc.archiveFile.tapeFiles[1].blockId=0;
    rqc.archiveFile.tapeFiles[1].compressedSize=1;
    rqc.archiveFile.tapeFiles[1].compressedSize=1;
    rqc.archiveFile.tapeFiles[1].copyNb=1;
    rqc.archiveFile.tapeFiles[1].creationTime=time(nullptr);
    rqc.archiveFile.tapeFiles[1].fSeq=pass;
    rqc.archiveFile.tapeFiles[1].vid="Tape0";
    rqc.archiveFile.tapeFiles[2].blockId=0;
    rqc.archiveFile.tapeFiles[2].compressedSize=1;
    rqc.archiveFile.tapeFiles[2].compressedSize=1;
    rqc.archiveFile.tapeFiles[2].copyNb=2;
    rqc.archiveFile.tapeFiles[2].creationTime=time(nullptr);
    rqc.archiveFile.tapeFiles[2].fSeq=pass;
    rqc.archiveFile.tapeFiles[2].vid="Tape1";
    rqc.mountPolicy.archiveMinRequestAge = 1;
    rqc.mountPolicy.archivePriority = 1;
    rqc.mountPolicy.creationLog.time = time(nullptr);
    rqc.mountPolicy.lastModificationLog.time = time(nullptr);
    rqc.mountPolicy.maxDrivesAllowed = 1;
    rqc.mountPolicy.retrieveMinRequestAge = 1;
    rqc.mountPolicy.retrievePriority = 1;
    rr.setRetrieveFileQueueCriteria(rqc);
    cta::common::dataStructures::RetrieveRequest sReq;
    sReq.archiveFileID = rqc.archiveFile.archiveFileID;
    sReq.creationLog.time=time(nullptr);
    rr.setSchedulerRequest(sReq);
    rr.addJob(1, 1, 1);
    rr.addJob(2, 1, 1);    
    rr.setOwner(agA.getAddressIfSet());
    rr.setActiveCopyNumber(0);
    rr.insert();
    cta::objectstore::ScopedExclusiveLock rrl(rr);
    if (pass < 3) { pass++; continue; }
    // - Reference job in the first tape
    {
      cta::objectstore::RetrieveQueue rq(tAddr[0], be);
      cta::objectstore::ScopedExclusiveLock rql(rq);
      rq.fetch();
590
591
      std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta;
      jta.push_back({1,rqc.archiveFile.tapeFiles[1].fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time});
592
      rq.addJobsAndCommit(jta);
593
594
    }
    if (pass < 5) { pass++; continue; }
595
    // - Still marked as not owned but referenced in the agent
596
597
598
599
600
601
602
603
604
    {
      rr.setOwner(tAddr[0]);
      rr.setActiveCopyNumber(1);
      rr.commit();
    }
    break;
  }
  // Mark the tape as enabled
  catalogue.addEnabledTape("Tape0");
605
606
  // Mark the other tape as disabled
  catalogue.addDisabledTape("Tape1");
607
  // Create the garbage collector and run it twice.
608
  cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
609
610
611
  cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
  gcAgent.initialize();
  gcAgent.setTimeout_us(0);
612
  gcAgent.insertAndRegisterSelf(lc);
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
  {
    cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
    gc.runOnePass(lc);
    gc.runOnePass(lc);
  }
  // All 4 requests should be linked in the first tape queue
  {
    cta::objectstore::RetrieveQueue rq(tAddr[0], be);
    cta::objectstore::ScopedExclusiveLock tp0lock(rq);
    rq.fetch();
    auto dump=rq.dumpJobs();
    // We expect all jobs with sizes 1002-1005 inclusive to be connected to
    // their respective tape pools.
    ASSERT_EQ(5, rq.getJobsSummary().files);
  }
  // Unregister gc's agent
  cta::objectstore::ScopedExclusiveLock gcal(gcAgent);
  gcAgent.fetch();
631
  gcAgent.removeAndUnregisterSelf(lc);
632
633
634
635
636
637
638
639
640
641
  // We should not be able to remove the agent register (as it should be empty)
  rel.lock(re);
  re.fetch();
  // Remove jobs from retrieve queue
  std::list<std::string> retrieveQueues = { "Tape0", "Tape1" };
  for (auto & vid: retrieveQueues) {
    // Empty queue
    cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid), be);
    cta::objectstore::ScopedExclusiveLock rql(rq);
    rq.fetch();
642
    std::list<std::string> jtrl;
643
    for (auto &j: rq.dumpJobs()) {
644
      jtrl.push_back(j.address);
645
    }
646
    rq.removeJobsAndCommit(jtrl);
647
648
    rql.release();
    // Remove queues from root
649
    re.removeRetrieveQueueAndCommit(vid, lc);
650
651
  }

652
653
  ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
  ASSERT_NO_THROW(re.removeIfEmpty(lc));
654
655
656
  // TODO: this unit test still leaks tape pools and requests
}

657
}