Scheduler.cpp 70.5 KB
Newer Older
1
/*
2
 * The CERN Tape Archive(CTA) project
3
 * Copyright (C) 2018 CERN
4
5
6
7
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
8
 *(at your option) any later version.
9
10
 *
 * This program is distributed in the hope that it will be useful,
11
12
13
 * but WITHOUT ANY WARRANTY {

} without even the implied warranty of
14
15
16
17
18
19
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

21

22
23
24
#include "Scheduler.hpp"
#include "ArchiveMount.hpp"
#include "RetrieveMount.hpp"
25
#include "common/dataStructures/ArchiveFileQueueCriteriaAndFileId.hpp"
26
#include "common/utils/utils.hpp"
27
#include "common/Timer.hpp"
28
#include "common/exception/NonRetryableError.hpp"
Daniele Kruse's avatar
Daniele Kruse committed
29
#include "common/exception/UserError.hpp"
30
#include "common/make_unique.hpp"
31
#include "objectstore/RepackRequest.hpp"
32
#include "RetrieveRequestDump.hpp"
33
34
#include "disk/DiskFileImplementations.hpp"
#include "disk/RadosStriperPool.hpp"
35

36
#include <iostream>
37
#include <sstream>
38
#include <iomanip>
39
40
41
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
Eric Cano's avatar
Eric Cano committed
42
#include <algorithm>
43
44
#include <random>
#include <chrono>
45

46
47
namespace cta {

48
//------------------------------------------------------------------------------
49
// constructor
50
//------------------------------------------------------------------------------
51
Scheduler::Scheduler(
52
  catalogue::Catalogue &catalogue,
53
54
55
  SchedulerDatabase &db, const uint64_t minFilesToWarrantAMount, const uint64_t minBytesToWarrantAMount): 
    m_catalogue(catalogue), m_db(db), m_minFilesToWarrantAMount(minFilesToWarrantAMount), 
    m_minBytesToWarrantAMount(minBytesToWarrantAMount) {}
56
57
58
59

//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
60
Scheduler::~Scheduler() throw() { }
61
62
63
64

//------------------------------------------------------------------------------
// ping
//------------------------------------------------------------------------------
65
66
void Scheduler::ping(log::LogContext & lc) {
  cta::utils::Timer t;
67
  m_catalogue.ping();
68
69
70
71
72
73
74
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
  m_db.ping();
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::ping(): success.");
75
}
76

77
78
79
80
81
82
83
//------------------------------------------------------------------------------
// waitSchedulerDbThreads
//------------------------------------------------------------------------------
void Scheduler::waitSchedulerDbSubthreadsComplete() {
  m_db.waitSubthreadsComplete();
}

84
//------------------------------------------------------------------------------
Daniele Kruse's avatar
Daniele Kruse committed
85
// authorizeAdmin
86
//------------------------------------------------------------------------------
87
88
void Scheduler::authorizeAdmin(const common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc){
  cta::utils::Timer t;
Daniele Kruse's avatar
Daniele Kruse committed
89
90
91
  if(!(m_catalogue.isAdmin(cliIdentity))) {
    std::stringstream msg;
    msg << "User: " << cliIdentity.username << " on host: " << cliIdentity.host << " is not authorized to execute CTA admin commands";
92
    throw exception::UserError(msg.str());
Daniele Kruse's avatar
Daniele Kruse committed
93
  }
94
95
96
97
  auto catalogueTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime);
  lc.log(log::INFO, "In Scheduler::authorizeAdmin(): success.");
98
99
}

100
101
102
103
104
105
106
107
108
109
110
111
//------------------------------------------------------------------------------
// checkAndGetNextArchiveFileId
//------------------------------------------------------------------------------
uint64_t Scheduler::checkAndGetNextArchiveFileId(const std::string &instanceName,
  const std::string &storageClassName, const common::dataStructures::UserIdentity &user, log::LogContext &lc) {
  cta::utils::Timer t;
  const uint64_t archiveFileId = m_catalogue.checkAndGetNextArchiveFileId(instanceName, storageClassName, user);
  const auto catalogueTime = t.secs();
  const auto schedulerDbTime = catalogueTime;

  log::ScopedParamContainer spc(lc);
  spc.add("instanceName", instanceName)
112
113
     .add("username", user.name)
     .add("usergroup", user.group)
114
115
116
117
118
119
120
121
122
     .add("storageClass", storageClassName)
     .add("fileId", archiveFileId)
     .add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "Checked request and got next archive file ID");

  return archiveFileId;
}

123
124
125
126
127
128
129
130
//------------------------------------------------------------------------------
// queueArchiveWithGivenId
//------------------------------------------------------------------------------
void Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
  const cta::common::dataStructures::ArchiveRequest &request, log::LogContext &lc) {
  cta::utils::Timer t;
  using utils::postEllipsis;
  using utils::midEllipsis;
131
132
  
  if (!request.fileSize)
133
    throw cta::exception::UserError(std::string("Rejecting archive request for zero-length file: ")+request.diskFileInfo.path);
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

  const auto queueCriteria = m_catalogue.getArchiveFileQueueCriteria(instanceName, request.storageClass,
    request.requester);
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);

  const common::dataStructures::ArchiveFileQueueCriteriaAndFileId catalogueInfo(archiveFileId,
    queueCriteria.copyToPoolMap, queueCriteria.mountPolicy);

  m_db.queueArchive(instanceName, request, catalogueInfo, lc);
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("instanceName", instanceName)
     .add("storageClass", request.storageClass)
     .add("diskFileID", request.diskFileID)
     .add("fileSize", request.fileSize)
     .add("fileId", catalogueInfo.fileId);
  for (auto & ctp: catalogueInfo.copyToPoolMap) {
    std::stringstream tp;
    tp << "tapePool" << ctp.first;
    spc.add(tp.str(), ctp.second);
  }
  spc.add("policyName", catalogueInfo.mountPolicy.name)
     .add("policyArchiveMinAge", catalogueInfo.mountPolicy.archiveMinRequestAge)
     .add("policyArchivePriority", catalogueInfo.mountPolicy.archivePriority)
     .add("policyMaxDrives", catalogueInfo.mountPolicy.maxDrivesAllowed)
     .add("diskFilePath", request.diskFileInfo.path)
     .add("diskFileOwner", request.diskFileInfo.owner)
     .add("diskFileGroup", request.diskFileInfo.group)
     .add("checksumValue", request.checksumValue)
     .add("checksumType", request.checksumType)
     .add("archiveReportURL", midEllipsis(request.archiveReportURL, 50, 15))
165
     .add("archiveErrorReportURL", midEllipsis(request.archiveErrorReportURL, 50, 15))
166
167
168
169
170
171
172
173
174
175
176
     .add("creationHost", request.creationLog.host)
     .add("creationTime", request.creationLog.time)
     .add("creationUser", request.creationLog.username)
     .add("requesterName", request.requester.name)
     .add("requesterGroup", request.requester.group)
     .add("srcURL", midEllipsis(request.srcURL, 50, 15))
     .add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "Queued archive request");
}

177
178
179
180
181
182
183
void Scheduler::queueArchiveRequestForRepackBatch(std::list<cta::objectstore::ArchiveRequest> &archiveRequests,log::LogContext &lc)
{
  for(auto& archiveRequest : archiveRequests){
    objectstore::ScopedExclusiveLock rReqL(archiveRequest);
    archiveRequest.fetch();
    cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile();
    rReqL.release();
184
185
    std::unique_ptr<cta::objectstore::ArchiveRequest>  arUniqPtr = cta::make_unique<cta::objectstore::ArchiveRequest>(archiveRequest);
    this->m_db.queueArchiveForRepack(std::move(arUniqPtr),lc);
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
    cta::log::TimingList tl;
    utils::Timer t;
    tl.insOrIncAndReset("schedulerDbTime", t);
    log::ScopedParamContainer spc(lc);
    spc.add("instanceName", archiveFile.diskInstance)
     .add("storageClass", archiveFile.storageClass)
     .add("diskFileID", archiveFile.diskFileId)
     .add("fileSize", archiveFile.fileSize)
     .add("fileId", archiveFile.archiveFileID);
    tl.insertOrIncrement("schedulerDbTime",t.secs());
    tl.addToLog(spc);
    lc.log(log::INFO,"Queued repack archive request");
  }
}

201
//------------------------------------------------------------------------------
202
// queueRetrieve
203
//------------------------------------------------------------------------------
204
void Scheduler::queueRetrieve(
205
  const std::string &instanceName,
206
207
208
209
210
  const common::dataStructures::RetrieveRequest &request,
  log::LogContext & lc) {
  using utils::postEllipsis;
  using utils::midEllipsis;
  utils::Timer t;
211
  // Get the queue criteria
212
  common::dataStructures::RetrieveFileQueueCriteria queueCriteria;
213
  queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc);
214
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
215
  std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc);
216
217
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
218
  spc.add("fileId", request.archiveFileID)
219
220
221
222
223
     .add("instanceName", instanceName)
     .add("diskFilePath", request.diskFileInfo.path)
     .add("diskFileOwner", request.diskFileInfo.owner)
     .add("diskFileGroup", request.diskFileInfo.group)
     .add("dstURL", request.dstURL)
224
     .add("errorReportURL", request.errorReportURL)
225
226
227
     .add("creationHost", request.creationLog.host)
     .add("creationTime", request.creationLog.time)
     .add("creationUser", request.creationLog.username)
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
     .add("requesterName", request.requester.name)
     .add("requesterGroup", request.requester.group)
     .add("criteriaArchiveFileId", queueCriteria.archiveFile.archiveFileID)
     .add("criteriaChecksumType", queueCriteria.archiveFile.checksumType)
     .add("criteriaChecksumValue", queueCriteria.archiveFile.checksumValue)
     .add("criteriaCreationTime", queueCriteria.archiveFile.creationTime)
     .add("criteriaDiskFileId", queueCriteria.archiveFile.diskFileId)
     .add("criteriaDiskFilePath", queueCriteria.archiveFile.diskFileInfo.path)
     .add("criteriaDiskFileOwner", queueCriteria.archiveFile.diskFileInfo.owner)
     .add("criteriaDiskInstance", queueCriteria.archiveFile.diskInstance)
     .add("criteriaFileSize", queueCriteria.archiveFile.fileSize)
     .add("reconciliationTime", queueCriteria.archiveFile.reconciliationTime)
     .add("storageClass", queueCriteria.archiveFile.storageClass);
  for (auto & tf:queueCriteria.archiveFile.tapeFiles) {
    std::stringstream tc;
243
244
    tc << "tapeCopy" << tf.copyNb;
    spc.add(tc.str(), tf);
245
246
  }
  spc.add("selectedVid", selectedVid)
247
     .add("catalogueTime", catalogueTime)
248
249
     .add("schedulerDbTime", schedulerDbTime)
     .add("policyName", queueCriteria.mountPolicy.name)
250
251
252
     .add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
     .add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
     .add("policyPriority", queueCriteria.mountPolicy.retrievePriority);
253
  lc.log(log::INFO, "Queued retrieve request");
254
255
}

256

257
//------------------------------------------------------------------------------
258
// deleteArchive
259
//------------------------------------------------------------------------------
260
261
void Scheduler::deleteArchive(const std::string &instanceName, const common::dataStructures::DeleteArchiveRequest &request, 
    log::LogContext & lc) {
262
263
264
  // We have different possible scenarios here. The file can be safe in the catalogue,
  // fully queued, or partially queued.
  // First, make sure the file is not queued anymore.
265
266
267
268
269
270
271
272
273
274
275
276
// TEMPORARILY commenting out SchedulerDatabase::deleteArchiveRequest() in order
// to reduce latency.  PLEASE NOTE however that this means files "in-flight" to
// tape will not be deleted and they will appear in the CTA catalogue when they
// are finally written to tape.
//try {
//  m_db.deleteArchiveRequest(instanceName, request.archiveFileID);
//} catch (exception::Exception &dbEx) {
//  // The file was apparently not queued. If we fail to remove it from the
//  // catalogue for any reason other than it does not exist in the catalogue,
//  // then it is an error.
//  m_catalogue.deleteArchiveFile(instanceName, request.archiveFileID);
//}
277
278
279
280
281
282
  utils::Timer t;
  m_catalogue.deleteArchiveFile(instanceName, request.archiveFileID, lc);
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime);
  lc.log(log::INFO, "In Scheduler::deleteArchive(): success.");
283
284
285
}

//------------------------------------------------------------------------------
286
// cancelRetrieve
287
//------------------------------------------------------------------------------
288
289
void Scheduler::cancelRetrieve(const std::string &instanceName, const common::dataStructures::CancelRetrieveRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
290
291
292
}

//------------------------------------------------------------------------------
293
// updateFileInfo
294
//------------------------------------------------------------------------------
295
296
void Scheduler::updateFileInfo(const std::string &instanceName, const common::dataStructures::UpdateFileInfoRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
297
}
298

299
//------------------------------------------------------------------------------
300
301
// updateFileStorageClass
//------------------------------------------------------------------------------
302
303
void Scheduler::updateFileStorageClass(const std::string &instanceName, const common::dataStructures::UpdateFileStorageClassRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
304
305
306
307
}

//------------------------------------------------------------------------------
// listStorageClass
308
//------------------------------------------------------------------------------
309
310
std::list<common::dataStructures::StorageClass> Scheduler::listStorageClass(const std::string &instanceName, const common::dataStructures::ListStorageClassRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
311
312
313
}

//------------------------------------------------------------------------------
314
// labelTape
315
//------------------------------------------------------------------------------
316
void Scheduler::queueLabel(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, const bool force) {
317
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
318
}
319

320
321
322
//------------------------------------------------------------------------------
// repack
//------------------------------------------------------------------------------
323
void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, 
324
    const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) {
325
326
327
328
329
330
331
332
  // Check request sanity
  if (vid.empty()) throw exception::UserError("Empty VID name.");
  if (bufferURL.empty()) throw exception::UserError("Empty buffer URL.");
  utils::Timer t;
  m_db.queueRepack(vid, bufferURL, repackType, lc);
  log::TimingList tl;
  tl.insertAndReset("schedulerDbTime", t);
  log::ScopedParamContainer params(lc);
333
  params.add("tapeVid", vid)
334
335
        .add("repackType", toString(repackType))
        .add("bufferURL", bufferURL);
336
337
  tl.addToLog(params);
  lc.log(log::INFO, "In Scheduler::queueRepack(): success.");
338
339
340
341
342
}

//------------------------------------------------------------------------------
// cancelRepack
//------------------------------------------------------------------------------
343
344
void Scheduler::cancelRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, log::LogContext & lc) {
  m_db.cancelRepack(vid, lc);
345
346
347
348
349
}

//------------------------------------------------------------------------------
// getRepacks
//------------------------------------------------------------------------------
350
351
std::list<common::dataStructures::RepackInfo> Scheduler::getRepacks() {
  return m_db.getRepackInfo();
352
353
354
355
356
}

//------------------------------------------------------------------------------
// getRepack
//------------------------------------------------------------------------------
357
common::dataStructures::RepackInfo Scheduler::getRepack(const std::string &vid) {
358
  return m_db.getRepackInfo(vid);
359
360
}

361
362
363
364
365
//------------------------------------------------------------------------------
// promoteRepackRequestsToToExpand
//------------------------------------------------------------------------------
void Scheduler::promoteRepackRequestsToToExpand(log::LogContext & lc) {
  // We target 2 fresh requests available for processing (ToExpand or Starting).
366
  const size_t targetAvailableRequests = 2;
Eric Cano's avatar
WIP    
Eric Cano committed
367
  // Dry-run test to check if promotion is needed.
368
  auto repackStatsNL = m_db.getRepackStatisticsNoLock();
369
370
  // Statistics are supposed to be initialized for each status value. We only try to
  // expand if there are requests available in Pending status.
371
  typedef common::dataStructures::RepackInfo::Status Status;
372
  if (repackStatsNL->at(Status::Pending) &&
373
          (targetAvailableRequests > repackStatsNL->at(Status::ToExpand) + repackStatsNL->at(Status::Starting))) {
374
375
    // Let's try to promote a repack request. Take the lock.
    repackStatsNL.reset();
376
377
378
379
380
381
382
    decltype(m_db.getRepackStatistics()) repackStats;
    try {
      repackStats = m_db.getRepackStatistics();
    } catch (SchedulerDatabase::RepackRequestStatistics::NoPendingRequestQueue &) {
      // Nothing to promote after all.
      return;
    }
Eric Cano's avatar
Eric Cano committed
383
    if (repackStats->at(Status::Pending) &&
384
385
            (targetAvailableRequests > repackStats->at(Status::ToExpand) + repackStats->at(Status::Starting))) {
      auto requestsToPromote = targetAvailableRequests;
386
387
388
389
390
391
392
393
394
      requestsToPromote -= repackStats->at(Status::ToExpand);
      requestsToPromote -= repackStats->at(Status::Starting);
      auto stats = repackStats->promotePendingRequestsForExpansion(requestsToPromote, lc);
      log::ScopedParamContainer params(lc);
      params.add("promotedRequests", stats.promotedRequests)
            .add("pendingBefore", stats.pendingBefore)
            .add("toEnpandBefore", stats.toEnpandBefore)
            .add("pendingAfter", stats.pendingAfter)
            .add("toExpandAfter", stats.toExpandAfter);
395
      lc.log(log::INFO,"In Scheduler::promoteRepackRequestsToToExpand(): Promoted repack request to \"to expand\"");
396
397
398
399
    }
  }
}

400
//------------------------------------------------------------------------------
401
// getNextRepackRequestToExpand
402
//------------------------------------------------------------------------------
403
std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() {
404
    std::unique_ptr<cta::SchedulerDatabase::RepackRequest> repackRequest;
405
    repackRequest = m_db.getNextRepackJobToExpand();
406
407
408
    if(repackRequest != nullptr){
      std::unique_ptr<RepackRequest> ret(new RepackRequest());
      ret->m_dbReq.reset(repackRequest.release());
409
      return std::move(ret);
410
411
    }
    return nullptr;
412
413
414
415
416
}

//------------------------------------------------------------------------------
// expandRepackRequest
//------------------------------------------------------------------------------
417
void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext& lc) {
418
  std::list<common::dataStructures::ArchiveFile> files;
419
  auto repackInfo = repackRequest->getRepackInfo();
420
421
  cta::SchedulerDatabase::RepackRequest::TotalStatsFiles totalStatsFile;
  
422
  typedef cta::common::dataStructures::RepackInfo::Type RepackType;
423
  if (repackInfo.type != RepackType::MoveOnly) {
424
425
426
427
428
429
    log::ScopedParamContainer params(lc);
    params.add("tapeVid", repackInfo.vid);
    lc.log(log::ERR, "In Scheduler::expandRepackRequest(): failing repack request with unsupported (yet) type.");
    repackRequest->fail();
    return;
  }
430
431
432
433
434
  //We need to get the ArchiveRoutes to allow the retrieval of the tapePool in which the
  //tape where the file is is located
  std::list<common::dataStructures::ArchiveRoute> routes = m_catalogue.getArchiveRoutes();
  //To identify the routes, we need to have both the dist instance name and the storage class name
  //thus, the key of the map is a pair of string
435
  cta::common::dataStructures::ArchiveRoute::FullMap archiveRoutesMap;
436
437
  for(auto route: routes){
    //insert the route into the map to allow a quick retrieval
438
    archiveRoutesMap[std::make_pair(route.diskInstanceName,route.storageClassName)][route.copyNb] = route;
439
  }
440
441
  uint64_t fSeq = repackRequest->m_dbReq->getLastExpandedFSeq() + 1;
  cta::catalogue::ArchiveFileItor archiveFilesForCatalogue = m_catalogue.getArchiveFilesForRepackItor(repackInfo.vid, fSeq);
442
443
  std::stringstream dirBufferURL;
  dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/";
444
445
  cta::disk::DirectoryFactory dirFactory;
  std::unique_ptr<cta::disk::Directory> dir;
446
447
448
449
450
451
452
  dir.reset(dirFactory.createDirectory(dirBufferURL.str()));
  std::set<std::string> filesInDirectory;
  if(dir->exist()){
    filesInDirectory = dir->getFilesName();
  } else {
    dir->mkdir();
  }
453
454
455
456
457
  while(archiveFilesForCatalogue.hasMore()) {
    size_t filesCount = 0;
    uint64_t maxAddedFSeq = 0;
    std::list<SchedulerDatabase::RepackRequest::Subrequest> retrieveSubrequests;
    while(filesCount < c_defaultMaxNbFilesForRepack && archiveFilesForCatalogue.hasMore())
458
    {
459
460
461
      filesCount++;
      fSeq++;
      retrieveSubrequests.push_back(cta::SchedulerDatabase::RepackRequest::Subrequest());
462
463
464
465
466
467
      auto archiveFile = archiveFilesForCatalogue.next();
      auto & retrieveSubRequest  = retrieveSubrequests.back();
      
      retrieveSubRequest.archiveFile = archiveFile;
      retrieveSubRequest.fSeq = std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max();
      
468
      // We have to determine which copynbs we want to rearchive, and under which fSeq we record this file.
469
      if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::MoveOnly) {
470
        // determine which fSeq(s) (normally only one) lives on this tape.
471
        for (auto & tc: archiveFile.tapeFiles) if (tc.vid == repackInfo.vid) {
472
          retrieveSubRequest.copyNbsToRearchive.insert(tc.copyNb);
473
474
475
476
          // We make the (reasonable) assumption that the archive file only has one copy on this tape.
          // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape.
          // This will prevent double subrequest creation (we already have such a mechanism in case of crash and 
          // restart of expansion.
477
          retrieveSubRequest.fSeq = std::min(tc.fSeq, retrieveSubRequest.fSeq);
478
          totalStatsFile.totalFilesToArchive += 1;
479
          totalStatsFile.totalBytesToArchive += retrieveSubRequest.archiveFile.fileSize;
480
481
        }
      }
482
483
484
485
      std::stringstream fileName;
      fileName << std::setw(9) << std::setfill('0') << retrieveSubRequest.fSeq;
      bool createArchiveSubrequest = false;
      if(filesInDirectory.count(fileName.str())){
486
487
488
        cta::disk::RadosStriperPool radosStriperPool;
        cta::disk::DiskFileFactory fileFactory("",0,radosStriperPool);
        cta::disk::ReadFile *fileReader = fileFactory.createReadFile(dirBufferURL.str() + fileName.str());
489
490
491
492
493
        if(fileReader->size() == archiveFile.fileSize){
          createArchiveSubrequest = true;
          retrieveSubrequests.pop_back();
          //TODO : We don't want to retrieve the file again, create archive subrequest
        }
494
      }
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
      if(!createArchiveSubrequest){
        totalStatsFile.totalBytesToRetrieve += retrieveSubRequest.archiveFile.fileSize;
        totalStatsFile.totalFilesToRetrieve += 1;
        if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::AddCopiesOnly) {
          // We should not get here are the type is filtered at the beginning of the function.
          // TODO: add support for expand.
          throw cta::exception::Exception("In Scheduler::expandRepackRequest(): expand not yet supported.");
        }
        if ((retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) || retrieveSubRequest.copyNbsToRearchive.empty()) {
          log::ScopedParamContainer params(lc);
          params.add("fileId", retrieveSubRequest.archiveFile.archiveFileID)
                .add("repackVid", repackInfo.vid);
          lc.log(log::ERR, "In Scheduler::expandRepackRequest(): no fSeq found for this file on this tape.");
          retrieveSubrequests.pop_back();
        } else {
          // We found some copies to rearchive. We still have to decide which file path we are going to use.
          // File path will be base URL + /<VID>/<fSeq>
          /*std::stringstream fileBufferURL;
          fileBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/" 
              << std::setw(9) << std::setfill('0') << rsr.fSeq;*/
          maxAddedFSeq = std::max(maxAddedFSeq,retrieveSubRequest.fSeq);
          retrieveSubRequest.fileBufferURL = dirBufferURL.str() + fileName.str();
        }
518
      }
519
    }
520
521
522
523
    // Note: the highest fSeq will be recorded internally in the following call.
    // We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted). 
    // We pass this information to the db for recording in the repack request. This will allow restarting from the right
    // value in case of crash.
524
    repackRequest->m_dbReq->setTotalStats(totalStatsFile);
525
526
    repackRequest->m_dbReq->addSubrequests(retrieveSubrequests, archiveRoutesMap, fSeq - 1, lc);
    fSeq = std::max(fSeq, maxAddedFSeq + 1);
527
    repackRequest->m_dbReq->setLastExpandedFSeq(fSeq);
528
  }
529
  repackRequest->m_dbReq->expandDone();
530
531
}

532
533
534
535
536
537
538
539
540
541
542
543
544
545
//------------------------------------------------------------------------------
// Scheduler::getNextRepackReportBatch
//------------------------------------------------------------------------------
Scheduler::RepackReportBatch Scheduler::getNextRepackReportBatch(log::LogContext& lc) {
  RepackReportBatch ret;
  ret.m_DbBatch = std::move(m_db.getNextRepackReportBatch(lc));
  return ret;
}

//------------------------------------------------------------------------------
// Scheduler::RepackReportBatch::report
//------------------------------------------------------------------------------
void Scheduler::RepackReportBatch::report(log::LogContext& lc) {
  if (nullptr == m_DbBatch) {
546
    // lc.log(log::DEBUG, "In Scheduler::RepackReportBatch::report(): empty batch.");
547
548
549
550
  } else {
    m_DbBatch->report(lc);
  }
}
551

552
//------------------------------------------------------------------------------
553
// getDesiredDriveState
554
//------------------------------------------------------------------------------
555
556
common::dataStructures::DesiredDriveState Scheduler::getDesiredDriveState(const std::string& driveName, log::LogContext & lc) {
  utils::Timer t;
557
  auto driveStates = m_db.getDriveStates(lc);
558
559
  for (auto & d: driveStates) {
    if (d.driveName == driveName) {
560
      auto schedulerDbTime = t.secs();
561
562
563
564
565
566
      if (schedulerDbTime > 1) {
        log::ScopedParamContainer spc(lc);
        spc.add("drive", driveName)
           .add("schedulerDbTime", schedulerDbTime);
        lc.log(log::INFO, "In Scheduler::getDesiredDriveState(): success.");
      }
567
568
569
570
571
572
573
574
575
      return d.desiredDriveState;
    }
  }
  throw NoSuchDrive ("In Scheduler::getDesiredDriveState(): no such drive");
}

//------------------------------------------------------------------------------
// setDesiredDriveState
//------------------------------------------------------------------------------
576
void Scheduler::setDesiredDriveState(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &driveName, const bool up, const bool force, log::LogContext & lc) {
577
  utils::Timer t;
578
579
580
  common::dataStructures::DesiredDriveState desiredDriveState;
  desiredDriveState.up = up;
  desiredDriveState.forceDown = force;
581
  m_db.setDesiredDriveState(driveName, desiredDriveState, lc);
582
583
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
584
585
  spc.add("drive", driveName)
     .add("up", up?"up":"down")
586
587
588
     .add("force", force?"yes":"no")
     .add("schedulerDbTime", schedulerDbTime);
   lc.log(log::INFO, "In Scheduler::setDesiredDriveState(): success.");   
589
590
}

591
592
593
594
595
596
//------------------------------------------------------------------------------
// removeDrive
//------------------------------------------------------------------------------
void Scheduler::removeDrive(const common::dataStructures::SecurityIdentity &cliIdentity, 
  const std::string &driveName, log::LogContext & lc) {
  utils::Timer t;
597
  m_db.removeDrive(driveName, lc);
598
599
600
601
602
603
604
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("drive", driveName)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::removeDrive(): success.");   
}

605
606
607
//------------------------------------------------------------------------------
// setDesiredDriveState
//------------------------------------------------------------------------------
608
void Scheduler::reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo, common::dataStructures::MountType type, common::dataStructures::DriveStatus status, log::LogContext & lc) {
609
  // TODO: mount type should be transmitted too.
610
  utils::Timer t;
611
  m_db.reportDriveStatus(driveInfo, type, status, time(NULL), lc);
612
  auto schedulerDbTime = t.secs();
613
614
615
616
617
618
  if (schedulerDbTime > 1) {
    log::ScopedParamContainer spc(lc);
    spc.add("drive", driveInfo.driveName)
       .add("schedulerDbTime", schedulerDbTime);
    lc.log(log::INFO, "In Scheduler::reportDriveStatus(): success.");
  }
619
620
621
622
623
}

//------------------------------------------------------------------------------
// getPendingArchiveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
624
625
626
627
628
629
630
631
std::map<std::string, std::list<common::dataStructures::ArchiveJob> > Scheduler::getPendingArchiveJobs(log::LogContext & lc) const {
  utils::Timer t;
  auto ret = m_db.getArchiveJobs();
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(): success.");
  return ret;
632
633
634
635
636
}

//------------------------------------------------------------------------------
// getPendingArchiveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
637
638
std::list<common::dataStructures::ArchiveJob> Scheduler::getPendingArchiveJobs(const std::string &tapePoolName, log::LogContext & lc) const {
  utils::Timer t;
639
  if(!m_catalogue.tapePoolExists(tapePoolName)) {
640
    throw exception::UserError(std::string("Tape pool ") + tapePoolName + " does not exist");
641
  }
Eric Cano's avatar
Eric Cano committed
642
643
644
645
646
647
648
649
  auto catalogueTime = t.secs(utils::Timer::resetCounter);
  auto ret = m_db.getArchiveJobs(tapePoolName);
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(tapePool): success.");
  return ret;
650
651
652
653
654
}

//------------------------------------------------------------------------------
// getPendingRetrieveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
655
656
657
658
659
660
661
662
std::map<std::string, std::list<common::dataStructures::RetrieveJob> > Scheduler::getPendingRetrieveJobs(log::LogContext & lc) const {
  utils::Timer t;
  auto ret =  m_db.getRetrieveJobs();
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingRetrieveJobs(): success.");
  return ret;
663
664
665
666
667
}

//------------------------------------------------------------------------------
// getPendingRetrieveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
668
std::list<common::dataStructures::RetrieveJob> Scheduler::getPendingRetrieveJobs(const std::string& vid, log::LogContext &lc) const {
669
670
671
672
673
674
675
  utils::Timer t;
  auto ret =  m_db.getRetrieveJobs(vid);
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingRetrieveJobs(): success.");
  return ret;
676
677
}

678
679
680
//------------------------------------------------------------------------------
// getDriveStates
//------------------------------------------------------------------------------
681
682
std::list<common::dataStructures::DriveState> Scheduler::getDriveStates(const common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc) const {
  utils::Timer t;
683
  auto ret = m_db.getDriveStates(lc);
684
685
686
687
688
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getDriveStates(): success.");
  return ret;
689
}
690
691

//------------------------------------------------------------------------------
692
// sortAndGetTapesForMountInfo
693
//------------------------------------------------------------------------------
694
695
696
697
void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>& mountInfo,
    const std::string & logicalLibraryName, const std::string & driveName, utils::Timer & timer,
    std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList,
    double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc) {
698
699
700
701
702
  // The library information is not know for the tapes involved in retrieves. We 
  // need to query the catalogue now about all those tapes.
  // Build the list of tapes.
  std::set<std::string> tapeSet;
  for (auto &m:mountInfo->potentialMounts) {
703
    if (m.type==common::dataStructures::MountType::Retrieve) tapeSet.insert(m.vid);
704
705
706
  }
  if (tapeSet.size()) {
    auto tapesInfo=m_catalogue.getTapesByVid(tapeSet);
707
    getTapeInfoTime = timer.secs(utils::Timer::resetCounter);
708
    for (auto &m:mountInfo->potentialMounts) {
709
      if (m.type==common::dataStructures::MountType::Retrieve) {
710
711
        m.logicalLibrary=tapesInfo[m.vid].logicalLibraryName;
        m.tapePool=tapesInfo[m.vid].tapePoolName;
712
713
714
715
        m.vendor = tapesInfo[m.vid].vendor;
        m.mediaType = tapesInfo[m.vid].mediaType;
        m.vo = tapesInfo[m.vid].vo;
        m.capacityInBytes = tapesInfo[m.vid].capacityInBytes;
716
717
718
719
      }
    }
  }
  
720
721
722
723
724
  // We should now filter the potential mounts to keep only the ones we are
  // compatible with (match the logical library for retrieves).
  // We also only want the potential mounts for which we still have 
  // We cannot filter the archives yet
  for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
725
    if (m->type == common::dataStructures::MountType::Retrieve && m->logicalLibrary != logicalLibraryName) {
726
727
728
729
730
731
732
733
      m = mountInfo->potentialMounts.erase(m);
    } else {
      m++;
    }
  }
  
  // With the existing mount list, we can now populate the potential mount list
  // with the per tape pool existing mount statistics.
734
  for (auto & em: mountInfo->existingOrNextMounts) {
735
736
737
    // If a mount is still listed for our own drive, it is a leftover that we disregard.
    if (em.driveName!=driveName) {
      try {
738
        existingMountsSummary.at(tpType(em.tapePool, common::dataStructures::getMountBasicType(em.type)))++;
739
      } catch (std::out_of_range &) {
740
        existingMountsSummary[tpType(em.tapePool, common::dataStructures::getMountBasicType(em.type))] = 1;
741
      }
742
743
744
      if (em.vid.size()) {
        tapesInUse.insert(em.vid);
        log::ScopedParamContainer params(lc);
745
        params.add("tapeVid", em.vid)
Eric Cano's avatar
Eric Cano committed
746
747
              .add("mountType", common::dataStructures::toString(em.type))
              .add("drive", em.driveName);
748
        lc.log(log::DEBUG,"In Scheduler::sortAndGetTapesForMountInfo(): tapeAlreadyInUse found.");
749
      }
750
751
752
753
754
755
756
757
758
759
760
    }
  }
  
  // We can now filter out the potential mounts for which their mount criteria
  // is already met, filter out the potential mounts for which the maximum mount
  // quota is already reached, and weight the remaining by how much of their quota 
  // is reached
  for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
    // Get summary data
    uint32_t existingMounts;
    try {
761
      existingMounts = existingMountsSummary.at(tpType(m->tapePool, common::dataStructures::getMountBasicType(m->type)));
762
763
764
    } catch (std::out_of_range &) {
      existingMounts = 0;
    } 
765
    uint32_t effectiveExistingMounts = 0;
766
    if (m->type == common::dataStructures::MountType::ArchiveForUser) effectiveExistingMounts = existingMounts;
767
    bool mountPassesACriteria = false;
768
769
    
    if (m->bytesQueued / (1 + effectiveExistingMounts) >= m_minBytesToWarrantAMount)
770
      mountPassesACriteria = true;
771
    if (m->filesQueued / (1 + effectiveExistingMounts) >= m_minFilesToWarrantAMount)
772
      mountPassesACriteria = true;
773
    if (!effectiveExistingMounts && ((time(NULL) - m->oldestJobStartTime) > m->minRequestAge))
774
      mountPassesACriteria = true;
775
    if (!mountPassesACriteria || existingMounts >= m->maxDrivesAllowed) {
776
      log::ScopedParamContainer params(lc);
777
      params.add("tapePool", m->tapePool);
778
      if ( m->type == common::dataStructures::MountType::Retrieve) {
779
        params.add("tapeVid", m->vid);
780
781
      }
      params.add("mountType", common::dataStructures::toString(m->type))
782
783
784
785
786
787
            .add("existingMounts", existingMounts)
            .add("bytesQueued", m->bytesQueued)
            .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
            .add("filesQueued", m->filesQueued)
            .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
            .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
788
            .add("minArchiveRequestAge", m->minRequestAge)
Eric Cano's avatar
Eric Cano committed
789
790
            .add("existingMounts", existingMounts)
            .add("maxDrivesAllowed", m->maxDrivesAllowed);
791
      lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Removing potential mount not passing criteria");
792
793
794
      m = mountInfo->potentialMounts.erase(m);
    } else {
      // populate the mount with a weight 
795
      m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->maxDrivesAllowed;
Eric Cano's avatar
Eric Cano committed
796
      log::ScopedParamContainer params(lc);
797
      params.add("tapePool", m->tapePool);
798
      if ( m->type == common::dataStructures::MountType::Retrieve) {
799
        params.add("tapeVid", m->vid);
800
801
      }
      params.add("mountType", common::dataStructures::toString(m->type))
Eric Cano's avatar
Eric Cano committed
802
803
804
805
806
807
            .add("existingMounts", existingMounts)
            .add("bytesQueued", m->bytesQueued)
            .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
            .add("filesQueued", m->filesQueued)
            .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
            .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
808
            .add("minArchiveRequestAge", m->minRequestAge)
Eric Cano's avatar
Eric Cano committed
809
810
811
            .add("existingMounts", existingMounts)
            .add("maxDrivesAllowed", m->maxDrivesAllowed)
            .add("ratioOfMountQuotaUsed", m->ratioOfMountQuotaUsed);
812
      lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Will consider potential mount");
813
814
815
816
817
818
819
820
821
822
      m++;
   }
  }
  
  // We can now sort the potential mounts in decreasing priority order. 
  // The ordering is defined in operator <.
  // We want the result in descending order or priority so we reverse the vector
  std::sort(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
  std::reverse(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
  
823
824
  candidateSortingTime = timer.secs(utils::Timer::resetCounter);
  
825
826
827
828
  // Find out if we have any potential archive mount in the list. If so, get the
  // list of tapes from the catalogue.
  if (std::count_if(
        mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(), 
829
        [](decltype(*mountInfo->potentialMounts.cbegin())& m){ return common::dataStructures::getMountBasicType(m.type) == common::dataStructures::MountType::ArchiveAllTypes; } )) {
830
    tapeList = m_catalogue.getTapesForWriting(logicalLibraryName);
831
    getTapeForWriteTime = timer.secs(utils::Timer::resetCounter);
832
  }
833
        
834
835
836
837
838
839
840
841
842
  // Remove from the tape list the ones already or soon to be mounted
  auto t=tapeList.begin();
  while (t!=tapeList.end()) {
    if (tapesInUse.count(t->vid)) {
      t=tapeList.erase(t);
    } else {
      t++;
    }
  }
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
}

//------------------------------------------------------------------------------
// getNextMountDryRun
//------------------------------------------------------------------------------
bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const std::string& driveName, log::LogContext& lc) {
  // We run the same algorithm as the actual getNextMount without the global lock
  // For this reason, we just return true as soon as valid mount has been found.
  utils::Timer timer;
  double getMountInfoTime = 0;
  double getTapeInfoTime = 0;
  double candidateSortingTime = 0;
  double getTapeForWriteTime = 0;
  double decisionTime = 0;
  double schedulerDbTime = 0;
  double catalogueTime = 0;
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
860
  mountInfo = m_db.getMountInfoNoLock(lc);
861
862
863
864
865
866
867
868
869
870
871
872
  getMountInfoTime = timer.secs(utils::Timer::resetCounter);
  std::map<tpType, uint32_t> existingMountsSummary;
  std::set<std::string> tapesInUse;
  std::list<catalogue::TapeForWriting> tapeList;
  
  sortAndGetTapesForMountInfo(mountInfo, logicalLibraryName, driveName, timer,
      existingMountsSummary, tapesInUse, tapeList,
      getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc);
  
  // We can now simply iterate on the candidates until we manage to find a valid mount
  for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
    // If the mount is an archive, we still have to find a tape.
873
    if (common::dataStructures::getMountBasicType(m->type)==common::dataStructures::MountType::ArchiveAllTypes) {
874
875
876
877
878
879
880
881
882
883
884
885
      // We need to find a tape for archiving. It should be both in the right 
      // tape pool and in the drive's logical library
      // The first tape matching will go for a prototype.
      // TODO: improve to reuse already partially written tapes and randomization
      for (auto & t: tapeList) {
        if (t.tapePool == m->tapePool) {
          // We have our tape. That's enough.
          decisionTime += timer.secs(utils::Timer::resetCounter);
          schedulerDbTime = getMountInfoTime;
          catalogueTime = getTapeInfoTime + getTapeForWriteTime;
          uint32_t existingMounts = 0;
          try {
886
            existingMounts=existingMountsSummary.at(tpType(m->tapePool, common::dataStructures::getMountBasicType(m->type)));
887
888
          } catch (...) {}
          log::ScopedParamContainer params(lc);
889
          params.add("tapePool", m->tapePool)
890
                .add("tapeVid", t.vid)
891
892
893
894
895
896
897
                .add("mountType", common::dataStructures::toString(m->type))
                .add("existingMounts", existingMounts)
                .add("bytesQueued", m->bytesQueued)
                .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
                .add("filesQueued", m->filesQueued)
                .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
                .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
898
                .add("minArchiveRequestAge", m->minRequestAge)
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
                .add("getMountInfoTime", getMountInfoTime)
                .add("getTapeInfoTime", getTapeInfoTime)
                .add("candidateSortingTime", candidateSortingTime)
                .add("getTapeForWriteTime", getTapeForWriteTime)
                .add("decisionTime", decisionTime)
                .add("schedulerDbTime", schedulerDbTime)
                .add("catalogueTime", catalogueTime);
          lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): Found a potential mount (archive)");
          return true;
        }
      }
    } else if (m->type==common::dataStructures::MountType::Retrieve) {
      // We know the tape we intend to mount. We have to validate the tape is 
      // actually available to read (not mounted or about to be mounted, and pass 
      // on it if so).
      if (tapesInUse.count(m->vid)) continue;
      decisionTime += timer.secs(utils::Timer::resetCounter);
      log::ScopedParamContainer params(lc);
      uint32_t existingMounts = 0;
      try {
        existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type));
      } catch (...) {}
      schedulerDbTime = getMountInfoTime;
      catalogueTime = getTapeInfoTime + getTapeForWriteTime;
923
      params.add("tapePool", m->tapePool)
924
            .add("tapeVid", m->vid)
925
926
927
928
929
930
931
            .add("mountType", common::dataStructures::toString(m->type))
            .add("existingMounts", existingMounts)
            .add("bytesQueued", m->bytesQueued)
            .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
            .add("filesQueued", m->filesQueued)
            .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
            .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
932
            .add("minArchiveRequestAge", m->minRequestAge)
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
            .add("getMountInfoTime", getMountInfoTime)
            .add("getTapeInfoTime", getTapeInfoTime)
            .add("candidateSortingTime", candidateSortingTime)
            .add("getTapeForWriteTime", getTapeForWriteTime)
            .add("decisionTime", decisionTime)
            .add("schedulerDbTime", schedulerDbTime)
            .add("catalogueTime", catalogueTime);
      lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): Found a potential mount (retrieve)");
      return true;
    }
  }
  schedulerDbTime = getMountInfoTime;
  catalogueTime = getTapeInfoTime + getTapeForWriteTime;
  decisionTime += timer.secs(utils::Timer::resetCounter);
  log::ScopedParamContainer params(lc);
  params.add("getMountInfoTime", getMountInfoTime)
        .add("getTapeInfoTime", getTapeInfoTime)
        .add("candidateSortingTime", candidateSortingTime)
        .add("getTapeForWriteTime", getTapeForWriteTime)
        .add("decisionTime", decisionTime)
        .add("schedulerDbTime", schedulerDbTime)
        .add("catalogueTime", catalogueTime);
955
956
957
  if ((getMountInfoTime > 1) || (getTapeInfoTime > 1) || (candidateSortingTime > 1) || (getTapeForWriteTime > 1) ||
      (decisionTime > 1) || (schedulerDbTime > 1) || (catalogueTime > 1))
    lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): No valid mount found.");
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
  return false;
}


//------------------------------------------------------------------------------
// getNextMount
//------------------------------------------------------------------------------
std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc) {
  // In order to decide the next mount to do, we have to take a global lock on 
  // the scheduling, retrieve a list of all running mounts, queues sizes for 
  // tapes and tape pools, order the candidates by priority
  // below threshold, and pick one at a time, we then attempt to get a tape 
  // from the catalogue (for the archive mounts), and walk the list until we
  // mount or find nothing to do.
  // We then skip to the next candidate, until we find a suitable one and
  // return the mount, or exhaust all of them an 
  // Many steps for this logic are not specific for the database and are hence
  // implemented in the scheduler itself.
  // First, get the mount-related info from the DB
  utils::Timer timer;
  double getMountInfoTime = 0;
  double queueTrimingTime = 0;
  double getTapeInfoTime = 0;
  double candidateSortingTime = 0;
  double getTapeForWriteTime = 0;
  double decisionTime = 0;
  double mountCreationTime = 0;
  double driveStatusSetTime = 0;
  double schedulerDbTime = 0;
  double catalogueTime = 0;
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
  mountInfo = m_db.getMountInfo(lc);
  getMountInfoTime = timer.secs(utils::Timer::resetCounter);
  if (mountInfo->queueTrimRequired) {
    m_db.trimEmptyQueues(lc);
    queueTrimingTime = timer.secs(utils::Timer::resetCounter);
  }
  __attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo;
  
  std::map<tpType, uint32_t> existingMountsSummary;
  std::set<std::string> tapesInUse;
  std::list<catalogue::TapeForWriting> tapeList;