SchedulerTest.cpp 26.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#include "catalogue/InMemoryCatalogue.hpp"
#include "catalogue/SchemaCreatingSqliteCatalogue.hpp"
21
#include "common/log/DummyLogger.hpp"
22
#include "common/make_unique.hpp"
23
#include "scheduler/ArchiveMount.hpp"
24
25
#include "scheduler/LogicalLibrary.hpp"
#include "scheduler/MountRequest.hpp"
26
#include "scheduler/OStoreDB/OStoreDBFactory.hpp"
27
#include "scheduler/RetrieveMount.hpp"
28
29
#include "scheduler/Scheduler.hpp"
#include "scheduler/SchedulerDatabase.hpp"
30
#include "scheduler/SchedulerDatabaseFactory.hpp"
31
#include "scheduler/TapeMount.hpp"
32
#include "tests/TempFile.hpp"
33
#include "common/log/DummyLogger.hpp"
34
#include "objectstore/BackendRadosTestSwitch.hpp"
35
36
37
38
#include "tests/TestsCompileTimeSwitches.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
#endif
39
40
41
42
43
44
45
46

#include <exception>
#include <gtest/gtest.h>
#include <memory>
#include <utility>

namespace unitTests {

47
48
namespace {

49
50
51
52
53
54
55
56
57
58
59
60
/**
 * This structure is used to parameterize scheduler tests.
 */
struct SchedulerTestParam {
  cta::SchedulerDatabaseFactory &dbFactory;

  SchedulerTestParam(
    cta::SchedulerDatabaseFactory &dbFactory):
    dbFactory(dbFactory) {
 }
}; // struct SchedulerTestParam

61
62
}

63
64
65
66
67
68
69
/**
 * The scheduler test is a parameterized test.  It takes a pair of name server
 * and scheduler database factories as a parameter.
 */
class SchedulerTest: public ::testing::TestWithParam<SchedulerTestParam> {
public:

70
  SchedulerTest(): m_dummyLog("dummy", "dummy") {
71
72
  }

73
74
75
76
77
78
79
  class FailedToGetCatalogue: public std::exception {
  public:
    const char *what() const throw() {
      return "Failed to get catalogue";
    }
  };

80
81
82
83
84
85
  class FailedToGetScheduler: public std::exception {
  public:
    const char *what() const throw() {
      return "Failed to get scheduler";
    }
  };
86
87
88
89
90
91
92
  
  class FailedToGetSchedulerDB: public std::exception {
  public:
    const char *what() const throw() {
      return "Failed to get object store db.";
    }
  };
93
94
95
96
97
98

  virtual void SetUp() {
    using namespace cta;

    const SchedulerTestParam &param = GetParam();
    m_db = param.dbFactory.create();
99
    const uint64_t nbConns = 1;
100
    const uint64_t nbArchiveFileListingConns = 1;
101
    //m_catalogue = cta::make_unique<catalogue::SchemaCreatingSqliteCatalogue>(m_tempSqliteFile.path(), nbConns);
102
    m_catalogue = cta::make_unique<catalogue::InMemoryCatalogue>(m_dummyLog, nbConns, nbArchiveFileListingConns);
103
    m_scheduler = cta::make_unique<Scheduler>(*m_catalogue, *m_db, 5, 2*1000*1000);
104
105
106
107
  }

  virtual void TearDown() {
    m_scheduler.reset();
108
    m_catalogue.reset();
109
110
111
    m_db.reset();
  }

112
113
114
115
116
117
118
119
  cta::catalogue::Catalogue &getCatalogue() {
    cta::catalogue::Catalogue *const ptr = m_catalogue.get();
    if(NULL == ptr) {
      throw FailedToGetCatalogue();
    }
    return *ptr;
  }
    
120
121
122
123
124
125
126
  cta::Scheduler &getScheduler() {
    cta::Scheduler *const ptr = m_scheduler.get();
    if(NULL == ptr) {
      throw FailedToGetScheduler();
    }
    return *ptr;
  }
127
  
128
129
130
131
132
133
134
135
  cta::SchedulerDatabase &getSchedulerDB() {
    cta::SchedulerDatabase *const ptr = m_db.get();
    if(NULL == ptr) {
      throw FailedToGetSchedulerDB();
    }
    return *ptr;
  }
  
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  void setupDefaultCatalogue() {
    using namespace cta;
    auto & catalogue=getCatalogue();

    const std::string mountPolicyName = "mount_group";
    const uint64_t archivePriority = 1;
    const uint64_t minArchiveRequestAge = 2;
    const uint64_t retrievePriority = 3;
    const uint64_t minRetrieveRequestAge = 4;
    const uint64_t maxDrivesAllowed = 5;
    const std::string mountPolicyComment = "create mount group";

    ASSERT_TRUE(catalogue.getMountPolicies().empty());

    catalogue.createMountPolicy(
      s_adminOnAdminHost,
      mountPolicyName,
      archivePriority,
      minArchiveRequestAge,
      retrievePriority,
      minRetrieveRequestAge,
      maxDrivesAllowed,
      mountPolicyComment);

160
    const std::list<common::dataStructures::MountPolicy> groups = catalogue.getMountPolicies();
161
162
163
164
165
166
167
168
169
170
    ASSERT_EQ(1, groups.size());
    const common::dataStructures::MountPolicy group = groups.front();
    ASSERT_EQ(mountPolicyName, group.name);
    ASSERT_EQ(archivePriority, group.archivePriority);
    ASSERT_EQ(minArchiveRequestAge, group.archiveMinRequestAge);
    ASSERT_EQ(retrievePriority, group.retrievePriority);
    ASSERT_EQ(minRetrieveRequestAge, group.retrieveMinRequestAge);
    ASSERT_EQ(maxDrivesAllowed, group.maxDrivesAllowed);
    ASSERT_EQ(mountPolicyComment, group.comment);

171
    const std::string ruleComment = "create requester mount-rule";
172
    cta::common::dataStructures::UserIdentity userIdentity;
173
    catalogue.createRequesterMountRule(s_adminOnAdminHost, mountPolicyName, s_diskInstance, s_userName, ruleComment);
174

175
176
    const std::list<common::dataStructures::RequesterMountRule> rules = catalogue.getRequesterMountRules();
    ASSERT_EQ(1, rules.size());
177

178
    const common::dataStructures::RequesterMountRule rule = rules.front();
179

180
181
182
183
184
185
    ASSERT_EQ(s_userName, rule.name);
    ASSERT_EQ(mountPolicyName, rule.mountPolicy);
    ASSERT_EQ(ruleComment, rule.comment);
    ASSERT_EQ(s_adminOnAdminHost.username, rule.creationLog.username);
    ASSERT_EQ(s_adminOnAdminHost.host, rule.creationLog.host);
    ASSERT_EQ(rule.creationLog, rule.lastModificationLog);
186

187
188
189
190
191
192
    common::dataStructures::StorageClass storageClass;
    storageClass.diskInstance = s_diskInstance;
    storageClass.name = s_storageClassName;
    storageClass.nbCopies = 1;
    storageClass.comment = "create storage class";
    m_catalogue->createStorageClass(s_adminOnAdminHost, storageClass);
193
194
195

    const uint16_t nbPartialTapes = 1;
    const std::string tapePoolComment = "Tape-pool comment";
196
    const std::string vo = "vo";
197
    const bool tapePoolEncryption = false;
198
    catalogue.createTapePool(s_adminOnAdminHost, s_tapePoolName, vo, nbPartialTapes, tapePoolEncryption, tapePoolComment);
199
200
    const uint16_t copyNb = 1;
    const std::string archiveRouteComment = "Archive-route comment";
201
202
    catalogue.createArchiveRoute(s_adminOnAdminHost, s_diskInstance, s_storageClassName, copyNb, s_tapePoolName,
      archiveRouteComment);
203
  }
204

205
private:
206

207
208
  // Prevent copying
  SchedulerTest(const SchedulerTest &) = delete;
209

210
211
  // Prevent assignment
  SchedulerTest & operator= (const SchedulerTest &) = delete;
212

213
  cta::log::DummyLogger m_dummyLog;
214
215
216
217
218
219
220
  std::unique_ptr<cta::SchedulerDatabase> m_db;
  std::unique_ptr<cta::catalogue::Catalogue> m_catalogue;
  std::unique_ptr<cta::Scheduler> m_scheduler;
  
protected:
  // Default parameters for storage classes, etc...
  const std::string s_userName = "user_name";
221
  const std::string s_diskInstance = "disk_instance";
222
223
224
225
226
  const std::string s_storageClassName = "TestStorageClass";
  const cta::common::dataStructures::SecurityIdentity s_adminOnAdminHost = { "admin1", "host1" };
  const std::string s_tapePoolName = "TestTapePool";
  const std::string s_libraryName = "TestLogicalLibrary";
  const std::string s_vid = "TestVid";
227
228
  const std::string s_mediaType = "TestMediaType";
  const std::string s_vendor = "TestVendor";
229
  //TempFile m_tempSqliteFile;
230

231
}; // class SchedulerTest
232

233
234
TEST_P(SchedulerTest, archive_to_new_file) {
  using namespace cta;
235

236
237
238
239
240
241
242
  setupDefaultCatalogue();
  Scheduler &scheduler = getScheduler();
  
  cta::common::dataStructures::EntryLog creationLog;
  creationLog.host="host2";
  creationLog.time=0;
  creationLog.username="admin1";
243
244
245
246
247
  cta::common::dataStructures::DiskFileInfo diskFileInfo;
  diskFileInfo.recoveryBlob="blob";
  diskFileInfo.group="group2";
  diskFileInfo.owner="cms_user";
  diskFileInfo.path="path/to/file";
248
  cta::common::dataStructures::ArchiveRequest request;
249
  request.checksumType="ADLER32";
250
251
  request.checksumValue="1111";
  request.creationLog=creationLog;
252
  request.diskFileInfo=diskFileInfo;
253
254
255
256
257
258
259
260
  request.diskFileID="diskFileID";
  request.fileSize=100*1000*1000;
  cta::common::dataStructures::UserIdentity requester;
  requester.name = s_userName;
  requester.group = "userGroup";
  request.requester = requester;
  request.srcURL="srcURL";
  request.storageClass=s_storageClassName;
261

262
  log::DummyLogger dl("", "");
263
  log::LogContext lc(dl);
264
265
266
  const uint64_t archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, request.storageClass,
      request.requester, lc);
  scheduler.queueArchiveWithGivenId(archiveFileId, s_diskInstance, request, lc);
267
  scheduler.waitSchedulerDbSubthreadsComplete();
268
269

  {
Eric Cano's avatar
Eric Cano committed
270
    auto rqsts = scheduler.getPendingArchiveJobs(lc);
271
272
273
    ASSERT_EQ(1, rqsts.size());
    auto poolItor = rqsts.cbegin();
    ASSERT_FALSE(poolItor == rqsts.cend());
274
275
276
277
    const std::string pool = poolItor->first;
    ASSERT_TRUE(s_tapePoolName == pool);
    auto poolRqsts = poolItor->second;
    ASSERT_EQ(1, poolRqsts.size());
278
279
280
281
    std::set<std::string> remoteFiles;
    std::set<std::string> archiveFiles;
    for(auto rqstItor = poolRqsts.cbegin();
      rqstItor != poolRqsts.cend(); rqstItor++) {
282
      remoteFiles.insert(rqstItor->request.diskFileInfo.path);
283
    }
284
    ASSERT_EQ(1, remoteFiles.size());
285
    ASSERT_FALSE(remoteFiles.find(request.diskFileInfo.path) == remoteFiles.end());
286
287
288
  }
}

289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// smurray commented this test out on Mon 17 Jul 2017.  The test assumes that
// Scheduler::deleteArchive() calls SchedulerDatabase::deleteArchiveRequest().
// This fact is currently not true as Scheduler::deleteArchive() has been
// temporarily modified to only call Catalogue::deleteArchiveFile().
//
//TEST_P(SchedulerTest, delete_archive_request) {
//  using namespace cta;
//
//  Scheduler &scheduler = getScheduler();
//
//  setupDefaultCatalogue();
//
//  cta::common::dataStructures::EntryLog creationLog;
//  creationLog.host="host2";
//  creationLog.time=0;
//  creationLog.username="admin1";
//  cta::common::dataStructures::DiskFileInfo diskFileInfo;
//  diskFileInfo.recoveryBlob="blob";
//  diskFileInfo.group="group2";
//  diskFileInfo.owner="cms_user";
//  diskFileInfo.path="path/to/file";
//  cta::common::dataStructures::ArchiveRequest request;
//  request.checksumType="ADLER32";
//  request.checksumValue="1111";
//  request.creationLog=creationLog;
//  request.diskFileInfo=diskFileInfo;
//  request.diskFileID="diskFileID";
//  request.fileSize=100*1000*1000;
//  cta::common::dataStructures::UserIdentity requester;
//  requester.name = s_userName;
//  requester.group = "userGroup";
//  request.requester = requester;
//  request.srcURL="srcURL";
//  request.storageClass=s_storageClassName;
//
//  log::DummyLogger dl("");
//  log::LogContext lc(dl);
//  auto archiveFileId = scheduler.queueArchive(s_diskInstance, request, lc);
//
//  // Check that we have the file in the queues
//  // TODO: for this to work all the time, we need an index of all requests
//  // (otherwise we miss the selected ones).
//  // Could also be limited to querying by ID (global index needed)
//  bool found=false;
//  for (auto & tp: scheduler.getPendingArchiveJobs()) {
//    for (auto & req: tp.second) {
//      if (req.archiveFileID == archiveFileId)
//        found = true;
//    }
//  }
//  ASSERT_TRUE(found);
//
//  // Remove the request
//  cta::common::dataStructures::DeleteArchiveRequest dar;
//  dar.archiveFileID = archiveFileId;
//  dar.requester.group = "group1";
//  dar.requester.name = "user1";
//  scheduler.deleteArchive("disk_instance", dar);
//
//  // Validate that the request is gone.
//  found=false;
//  for (auto & tp: scheduler.getPendingArchiveJobs()) {
//    for (auto & req: tp.second) {
//      if (req.archiveFileID == archiveFileId)
//        found = true;
//    }
//  }
//  ASSERT_FALSE(found);
//}
358

359
TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
360
361
362
  using namespace cta;

  Scheduler &scheduler = getScheduler();
363
364
  auto &catalogue = getCatalogue();
  
365
  setupDefaultCatalogue();
366
#ifdef STDOUT_LOGGING
367
  log::StdoutLogger dl("dummy", "unitTest");
368
#else
369
  log::DummyLogger dl("", "");
370
#endif
371
372
  log::LogContext lc(dl);
  
373
374
375
376
377
378
379
  uint64_t archiveFileId;
  {
    // Queue an archive request.
    cta::common::dataStructures::EntryLog creationLog;
    creationLog.host="host2";
    creationLog.time=0;
    creationLog.username="admin1";
380
381
382
383
384
    cta::common::dataStructures::DiskFileInfo diskFileInfo;
    diskFileInfo.recoveryBlob="blob";
    diskFileInfo.group="group2";
    diskFileInfo.owner="cms_user";
    diskFileInfo.path="path/to/file";
385
    cta::common::dataStructures::ArchiveRequest request;
386
    request.checksumType="ADLER32";
387
    request.checksumValue="1234abcd";
388
    request.creationLog=creationLog;
389
    request.diskFileInfo=diskFileInfo;
390
391
392
393
394
395
396
397
    request.diskFileID="diskFileID";
    request.fileSize=100*1000*1000;
    cta::common::dataStructures::UserIdentity requester;
    requester.name = s_userName;
    requester.group = "userGroup";
    request.requester = requester;
    request.srcURL="srcURL";
    request.storageClass=s_storageClassName;
398
399
    archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, request.storageClass, request.requester, lc);
    scheduler.queueArchiveWithGivenId(archiveFileId, s_diskInstance, request, lc);
400
  }
401
  scheduler.waitSchedulerDbSubthreadsComplete();
402
403
404
405
406
407
  
  // Check that we have the file in the queues
  // TODO: for this to work all the time, we need an index of all requests
  // (otherwise we miss the selected ones).
  // Could also be limited to querying by ID (global index needed)
  bool found=false;
Eric Cano's avatar
Eric Cano committed
408
  for (auto & tp: scheduler.getPendingArchiveJobs(lc)) {
409
410
411
412
413
414
415
416
    for (auto & req: tp.second) {
      if (req.archiveFileID == archiveFileId)
        found = true;
    }
  }
  ASSERT_TRUE(found);

  // Create the environment for the migration to happen (library + tape) 
417
  const std::string libraryComment = "Library comment";
418
419
  catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName,
    libraryComment);
420
  {
421
    auto libraries = catalogue.getLogicalLibraries();
422
    ASSERT_EQ(1, libraries.size());
423
424
    ASSERT_EQ(s_libraryName, libraries.front().name);
    ASSERT_EQ(libraryComment, libraries.front().comment);
425
426
  }
  const uint64_t capacityInBytes = 12345678;
427
428
429
  const std::string tapeComment = "Tape comment";
  bool notDisabled = false;
  bool notFull = false;
430
  catalogue.createTape(s_adminOnAdminHost, s_vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes,
431
    notDisabled, notFull, tapeComment);
432

433
  const bool lbpIsOn = true;
434
435
  const std::string driveName = "tape_drive";

436
  catalogue.tapeLabelled(s_vid, "tape_drive", lbpIsOn);
437

438
439
440
441
  {
    // Emulate a tape server by asking for a mount and then a file (and succeed
    // the transfer)
    std::unique_ptr<cta::TapeMount> mount;
442
443
    // This first initialization is normally done by the dataSession function.
    cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName };
444
445
    scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc);
    scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc);
446
    mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
447
    ASSERT_NE((cta::TapeMount*)NULL, mount.get());
448
    ASSERT_EQ(cta::common::dataStructures::MountType::Archive, mount.get()->getMountType());
449
    auto & osdb=getSchedulerDB();
450
    auto mi=osdb.getMountInfo(lc);
451
452
453
    ASSERT_EQ(1, mi->existingOrNextMounts.size());
    ASSERT_EQ("TestTapePool", mi->existingOrNextMounts.front().tapePool);
    ASSERT_EQ("TestVid", mi->existingOrNextMounts.front().vid);
454
    std::unique_ptr<cta::ArchiveMount> archiveMount;
455
    archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()));
456
    ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get());
457
458
    std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
    ASSERT_NE((cta::ArchiveJob*)NULL, archiveJobBatch.front().get());
459
    std::unique_ptr<ArchiveJob> archiveJob = std::move(archiveJobBatch.front());
460
461
    archiveJob->tapeFile.blockId = 1;
    archiveJob->tapeFile.fSeq = 1;
462
    archiveJob->tapeFile.checksumType = "ADLER32";
463
    archiveJob->tapeFile.checksumValue = "1234abcd";
464
465
    archiveJob->tapeFile.compressedSize = archiveJob->archiveFile.fileSize;
    archiveJob->tapeFile.copyNb = 1;
466
    archiveJob->validate();
467
    std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch;
468
    std::queue<cta::catalogue::TapeItemWritten> sTapeItems;
469
    sDBarchiveJobBatch.emplace(std::move(archiveJob));
470
    archiveMount->reportJobsBatchWritten(sDBarchiveJobBatch, sTapeItems, lc);
471
472
    archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
    ASSERT_EQ(0, archiveJobBatch.size());
473
    archiveMount->complete();
474
475
476
  }

  {
477
478
479
480
    cta::common::dataStructures::EntryLog creationLog;
    creationLog.host="host2";
    creationLog.time=0;
    creationLog.username="admin1";
481
482
483
484
485
    cta::common::dataStructures::DiskFileInfo diskFileInfo;
    diskFileInfo.recoveryBlob="blob";
    diskFileInfo.group="group2";
    diskFileInfo.owner="cms_user";
    diskFileInfo.path="path/to/file";
486
487
    cta::common::dataStructures::RetrieveRequest request;
    request.archiveFileID = archiveFileId;
488
    request.creationLog = creationLog;
489
    request.diskFileInfo = diskFileInfo;
490
491
492
    request.dstURL = "dstURL";
    request.requester.name = s_userName;
    request.requester.group = "userGroup";
493
    scheduler.queueRetrieve("disk_instance", request, lc);
494
    scheduler.waitSchedulerDbSubthreadsComplete();
495
496
  }

497
  // Check that the retrieve request is queued
498
  {
Eric Cano's avatar
Eric Cano committed
499
    auto rqsts = scheduler.getPendingRetrieveJobs(lc);
500
    // We expect 1 tape with queued jobs
501
    ASSERT_EQ(1, rqsts.size());
502
503
504
505
506
507
508
509
510
511
512
    // We expect the queue to contain 1 job
    ASSERT_EQ(1, rqsts.cbegin()->second.size());
    // We expect the job to be single copy
    auto & job = rqsts.cbegin()->second.back();
    ASSERT_EQ(1, job.tapeCopies.size());
    // We expect the copy to be on the provided tape.
    ASSERT_TRUE(s_vid == job.tapeCopies.cbegin()->first);
    // Check the remote target
    ASSERT_EQ("dstURL", job.request.dstURL);
    // Check the archive file ID
    ASSERT_EQ(archiveFileId, job.request.archiveFileID);
513
514
515
516
517
518
519
520
521
522
523
524
525

    // Check that we can retrieve jobs by VID

    // Get the vid from the above job and submit a separate request for the same vid
    auto vid = rqsts.begin()->second.back().tapeCopies.begin()->first;
    auto rqsts_vid = scheduler.getPendingRetrieveJobs(vid, lc);
    // same tests as above
    ASSERT_EQ(1, rqsts_vid.size());
    auto &job_vid = rqsts_vid.back();
    ASSERT_EQ(1, job_vid.tapeCopies.size());
    ASSERT_TRUE(s_vid == job_vid.tapeCopies.cbegin()->first);
    ASSERT_EQ("dstURL", job_vid.request.dstURL);
    ASSERT_EQ(archiveFileId, job_vid.request.archiveFileID);
526
527
528
529
530
531
  }
  
  {
    // Emulate a tape server by asking for a mount and then a file (and succeed
    // the transfer)
    std::unique_ptr<cta::TapeMount> mount;
532
    mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
533
    ASSERT_NE((cta::TapeMount*)NULL, mount.get());
534
    ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType());
535
    std::unique_ptr<cta::RetrieveMount> retrieveMount;
536
    retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release()));
537
538
    ASSERT_NE((cta::RetrieveMount*)NULL, retrieveMount.get());
    std::unique_ptr<cta::RetrieveJob> retrieveJob;
539
540
541
    auto jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
    ASSERT_EQ(1, jobBatch.size());
    retrieveJob.reset(jobBatch.front().release());
542
    ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
543
544
    retrieveJob->asyncComplete();
    retrieveJob->checkComplete();
545
546
    jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
    ASSERT_EQ(0, jobBatch.size());
547
548
  }
}
549

550
TEST_P(SchedulerTest, retry_archive_until_max_reached) {
551
  using namespace cta;
552
553
  
  setupDefaultCatalogue();
554

555
  auto &scheduler = getScheduler();
556
  auto &catalogue = getCatalogue();
557
  
558
#ifdef STDOUT_LOGGING
559
  log::StdoutLogger dl("dummy", "unitTest");
560
#else
561
  log::DummyLogger dl("", "");
562
#endif
563
564
  log::LogContext lc(dl);
  
565
566
567
568
569
570
571
  uint64_t archiveFileId;
  {
    // Queue an archive request.
    cta::common::dataStructures::EntryLog creationLog;
    creationLog.host="host2";
    creationLog.time=0;
    creationLog.username="admin1";
572
573
574
575
576
    cta::common::dataStructures::DiskFileInfo diskFileInfo;
    diskFileInfo.recoveryBlob="blob";
    diskFileInfo.group="group2";
    diskFileInfo.owner="cms_user";
    diskFileInfo.path="path/to/file";
577
    cta::common::dataStructures::ArchiveRequest request;
578
    request.checksumType="ADLER32";
579
580
    request.checksumValue="1111";
    request.creationLog=creationLog;
581
    request.diskFileInfo=diskFileInfo;
582
583
584
585
586
587
588
589
    request.diskFileID="diskFileID";
    request.fileSize=100*1000*1000;
    cta::common::dataStructures::UserIdentity requester;
    requester.name = s_userName;
    requester.group = "userGroup";
    request.requester = requester;
    request.srcURL="srcURL";
    request.storageClass=s_storageClassName;
590
    request.archiveErrorReportURL="null:";
591
592
    archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, request.storageClass, request.requester, lc);
    scheduler.queueArchiveWithGivenId(archiveFileId, s_diskInstance, request, lc);
593
  }
594
  scheduler.waitSchedulerDbSubthreadsComplete();
595
  
596
597
598
599
600
601
602
603
604
605
606
607
608
609
  // Create the environment for the migration to happen (library + tape) 
    const std::string libraryComment = "Library comment";
  catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName,
    libraryComment);
  {
    auto libraries = catalogue.getLogicalLibraries();
    ASSERT_EQ(1, libraries.size());
    ASSERT_EQ(s_libraryName, libraries.front().name);
    ASSERT_EQ(libraryComment, libraries.front().comment);
  }
  const uint64_t capacityInBytes = 12345678;
  const std::string tapeComment = "Tape comment";
  bool notDisabled = false;
  bool notFull = false;
610
611
  catalogue.createTape(s_adminOnAdminHost, s_vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes,
    notDisabled, notFull, tapeComment);
612

613
614
  const bool lbpIsOn = true;
  catalogue.tapeLabelled(s_vid, "tape_drive", lbpIsOn);
615

616
617
618
  {
    // Emulate a tape server by asking for a mount and then a file
    std::unique_ptr<cta::TapeMount> mount;
619
    mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
620
    ASSERT_NE((cta::TapeMount*)NULL, mount.get());
621
    ASSERT_EQ(cta::common::dataStructures::MountType::Archive, mount.get()->getMountType());
622
    std::unique_ptr<cta::ArchiveMount> archiveMount;
623
    archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()));
624
    ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get());
625
626
    // The file should be retried twice
    for (int i=0; i<=1; i++) {
627
628
      std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobList = archiveMount->getNextJobBatch(1,1,lc);
      if (!archiveJobList.front().get()) {
629
630
        int __attribute__((__unused__)) debugI=i;
      }
631
      ASSERT_NE(0, archiveJobList.size());
632
      // Validate we got the right file
633
      ASSERT_EQ(archiveFileId, archiveJobList.front()->archiveFile.archiveFileID);
634
      archiveJobList.front()->failed("Archive failed", lc);
635
636
    }
    // Then the request should be gone
637
    ASSERT_EQ(0, archiveMount->getNextJobBatch(1,1,lc).size());
638
639
  }
}
640

641
642
TEST_P(SchedulerTest, retrieve_non_existing_file) {
  using namespace cta;
643
644
645
  
  setupDefaultCatalogue();
  
646
  Scheduler &scheduler = getScheduler();
647
  
648
  log::DummyLogger dl("", "");
649
  log::LogContext lc(dl);
650
651

  {
652
653
654
655
    cta::common::dataStructures::EntryLog creationLog;
    creationLog.host="host2";
    creationLog.time=0;
    creationLog.username="admin1";
656
657
658
659
660
    cta::common::dataStructures::DiskFileInfo diskFileInfo;
    diskFileInfo.recoveryBlob="blob";
    diskFileInfo.group="group2";
    diskFileInfo.owner="cms_user";
    diskFileInfo.path="path/to/file";
661
662
    cta::common::dataStructures::RetrieveRequest request;
    request.archiveFileID = 12345;
663
    request.creationLog = creationLog;
664
    request.diskFileInfo = diskFileInfo;
665
666
667
    request.dstURL = "dstURL";
    request.requester.name = s_userName;
    request.requester.group = "userGroup";
668
    ASSERT_THROW(scheduler.queueRetrieve("disk_instance", request, lc), cta::exception::Exception);
669
670
  }
}
671

672
673
674
675
676
677
678
TEST_P(SchedulerTest, showqueues) {
  using namespace cta;
  
  setupDefaultCatalogue();
  
  Scheduler &scheduler = getScheduler();
  
679
  log::DummyLogger dl("", "");
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
  log::LogContext lc(dl);
  
  uint64_t archiveFileId __attribute__((unused));
  {
    // Queue an archive request.
    cta::common::dataStructures::EntryLog creationLog;
    creationLog.host="host2";
    creationLog.time=0;
    creationLog.username="admin1";
    cta::common::dataStructures::DiskFileInfo diskFileInfo;
    diskFileInfo.recoveryBlob="blob";
    diskFileInfo.group="group2";
    diskFileInfo.owner="cms_user";
    diskFileInfo.path="path/to/file";
    cta::common::dataStructures::ArchiveRequest request;
    request.checksumType="ADLER32";
    request.checksumValue="1111";
    request.creationLog=creationLog;
    request.diskFileInfo=diskFileInfo;
    request.diskFileID="diskFileID";
    request.fileSize=100*1000*1000;
    cta::common::dataStructures::UserIdentity requester;
    requester.name = s_userName;
    requester.group = "userGroup";
    request.requester = requester;
    request.srcURL="srcURL";
    request.storageClass=s_storageClassName;
707
708
    archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, request.storageClass, request.requester, lc);
    scheduler.queueArchiveWithGivenId(archiveFileId, s_diskInstance, request, lc);
709
  }
710
  scheduler.waitSchedulerDbSubthreadsComplete();
711
712
713
714
715
716
  
  // get the queues from scheduler
  auto queuesSummary = scheduler.getQueuesAndMountSummaries(lc);
  ASSERT_EQ(1, queuesSummary.size());
}

717
718
719
720
#undef TEST_MOCK_DB
#ifdef TEST_MOCK_DB
static cta::MockSchedulerDatabaseFactory mockDbFactory;
INSTANTIATE_TEST_CASE_P(MockSchedulerTest, SchedulerTest,
721
  ::testing::Values(SchedulerTestParam(mockDbFactory)));
722
723
724
725
726
727
728
#endif

#define TEST_VFS
#ifdef TEST_VFS
static cta::OStoreDBFactory<cta::objectstore::BackendVFS> OStoreDBFactoryVFS;

INSTANTIATE_TEST_CASE_P(OStoreDBPlusMockSchedulerTestVFS, SchedulerTest,
729
  ::testing::Values(SchedulerTestParam(OStoreDBFactoryVFS)));
730
731
732
733
734
735
#endif

#ifdef TEST_RADOS
static cta::OStoreDBFactory<cta::objectstore::BackendRados> OStoreDBFactoryRados("rados://tapetest@tapetest");

INSTANTIATE_TEST_CASE_P(OStoreDBPlusMockSchedulerTestRados, SchedulerTest,
736
  ::testing::Values(SchedulerTestParam(OStoreDBFactoryRados)));
737
738
739
#endif
} // namespace unitTests