Scheduler.cpp 62.8 KB
Newer Older
1
/*
2
3
 * The CERN Tape Archive(CTA) project
 * Copyright(C) 2015  CERN
4
5
6
7
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
8
 *(at your option) any later version.
9
10
 *
 * This program is distributed in the hope that it will be useful,
11
12
13
 * but WITHOUT ANY WARRANTY {

} without even the implied warranty of
14
15
16
17
18
19
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

21

22
23
24
#include "Scheduler.hpp"
#include "ArchiveMount.hpp"
#include "RetrieveMount.hpp"
25
#include "common/dataStructures/ArchiveFileQueueCriteriaAndFileId.hpp"
26
#include "common/utils/utils.hpp"
27
#include "common/Timer.hpp"
28
#include "common/exception/NonRetryableError.hpp"
Daniele Kruse's avatar
Daniele Kruse committed
29
#include "common/exception/UserError.hpp"
30
#include "common/make_unique.hpp"
31
#include "objectstore/RepackRequest.hpp"
32
#include "RetrieveRequestDump.hpp"
33

34
#include <iostream>
35
#include <sstream>
36
37
38
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
Eric Cano's avatar
Eric Cano committed
39
#include <algorithm>
40
41
#include <random>
#include <chrono>
42

43
44
namespace cta {

45
//------------------------------------------------------------------------------
46
// constructor
47
//------------------------------------------------------------------------------
48
Scheduler::Scheduler(
49
  catalogue::Catalogue &catalogue,
50
51
52
  SchedulerDatabase &db, const uint64_t minFilesToWarrantAMount, const uint64_t minBytesToWarrantAMount): 
    m_catalogue(catalogue), m_db(db), m_minFilesToWarrantAMount(minFilesToWarrantAMount), 
    m_minBytesToWarrantAMount(minBytesToWarrantAMount) {}
53
54
55
56

//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
57
Scheduler::~Scheduler() throw() { }
58
59
60
61

//------------------------------------------------------------------------------
// ping
//------------------------------------------------------------------------------
62
63
void Scheduler::ping(log::LogContext & lc) {
  cta::utils::Timer t;
64
  m_catalogue.ping();
65
66
67
68
69
70
71
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
  m_db.ping();
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::ping(): success.");
72
}
73

74
75
76
77
78
79
80
//------------------------------------------------------------------------------
// waitSchedulerDbThreads
//------------------------------------------------------------------------------
void Scheduler::waitSchedulerDbSubthreadsComplete() {
  m_db.waitSubthreadsComplete();
}

81
//------------------------------------------------------------------------------
Daniele Kruse's avatar
Daniele Kruse committed
82
// authorizeAdmin
83
//------------------------------------------------------------------------------
84
85
void Scheduler::authorizeAdmin(const common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc){
  cta::utils::Timer t;
Daniele Kruse's avatar
Daniele Kruse committed
86
87
88
  if(!(m_catalogue.isAdmin(cliIdentity))) {
    std::stringstream msg;
    msg << "User: " << cliIdentity.username << " on host: " << cliIdentity.host << " is not authorized to execute CTA admin commands";
89
    throw exception::UserError(msg.str());
Daniele Kruse's avatar
Daniele Kruse committed
90
  }
91
92
93
94
  auto catalogueTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime);
  lc.log(log::INFO, "In Scheduler::authorizeAdmin(): success.");
95
96
}

97
98
99
100
101
102
103
104
105
106
107
108
//------------------------------------------------------------------------------
// checkAndGetNextArchiveFileId
//------------------------------------------------------------------------------
uint64_t Scheduler::checkAndGetNextArchiveFileId(const std::string &instanceName,
  const std::string &storageClassName, const common::dataStructures::UserIdentity &user, log::LogContext &lc) {
  cta::utils::Timer t;
  const uint64_t archiveFileId = m_catalogue.checkAndGetNextArchiveFileId(instanceName, storageClassName, user);
  const auto catalogueTime = t.secs();
  const auto schedulerDbTime = catalogueTime;

  log::ScopedParamContainer spc(lc);
  spc.add("instanceName", instanceName)
109
110
     .add("username", user.name)
     .add("usergroup", user.group)
111
112
113
114
115
116
117
118
119
     .add("storageClass", storageClassName)
     .add("fileId", archiveFileId)
     .add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "Checked request and got next archive file ID");

  return archiveFileId;
}

120
121
122
123
124
125
126
127
//------------------------------------------------------------------------------
// queueArchiveWithGivenId
//------------------------------------------------------------------------------
void Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
  const cta::common::dataStructures::ArchiveRequest &request, log::LogContext &lc) {
  cta::utils::Timer t;
  using utils::postEllipsis;
  using utils::midEllipsis;
128
129
  
  if (!request.fileSize)
130
    throw cta::exception::UserError(std::string("Rejecting archive request for zero-length file: ")+request.diskFileInfo.path);
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

  const auto queueCriteria = m_catalogue.getArchiveFileQueueCriteria(instanceName, request.storageClass,
    request.requester);
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);

  const common::dataStructures::ArchiveFileQueueCriteriaAndFileId catalogueInfo(archiveFileId,
    queueCriteria.copyToPoolMap, queueCriteria.mountPolicy);

  m_db.queueArchive(instanceName, request, catalogueInfo, lc);
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("instanceName", instanceName)
     .add("storageClass", request.storageClass)
     .add("diskFileID", request.diskFileID)
     .add("fileSize", request.fileSize)
     .add("fileId", catalogueInfo.fileId);
  for (auto & ctp: catalogueInfo.copyToPoolMap) {
    std::stringstream tp;
    tp << "tapePool" << ctp.first;
    spc.add(tp.str(), ctp.second);
  }
  spc.add("policyName", catalogueInfo.mountPolicy.name)
     .add("policyArchiveMinAge", catalogueInfo.mountPolicy.archiveMinRequestAge)
     .add("policyArchivePriority", catalogueInfo.mountPolicy.archivePriority)
     .add("policyMaxDrives", catalogueInfo.mountPolicy.maxDrivesAllowed)
     .add("diskFilePath", request.diskFileInfo.path)
     .add("diskFileOwner", request.diskFileInfo.owner)
     .add("diskFileGroup", request.diskFileInfo.group)
     .add("diskFileRecoveryBlob", postEllipsis(request.diskFileInfo.recoveryBlob, 20))
     .add("checksumValue", request.checksumValue)
     .add("checksumType", request.checksumType)
     .add("archiveReportURL", midEllipsis(request.archiveReportURL, 50, 15))
163
     .add("archiveErrorReportURL", midEllipsis(request.archiveErrorReportURL, 50, 15))
164
165
166
167
168
169
170
171
172
173
174
     .add("creationHost", request.creationLog.host)
     .add("creationTime", request.creationLog.time)
     .add("creationUser", request.creationLog.username)
     .add("requesterName", request.requester.name)
     .add("requesterGroup", request.requester.group)
     .add("srcURL", midEllipsis(request.srcURL, 50, 15))
     .add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "Queued archive request");
}

175
//------------------------------------------------------------------------------
176
// queueRetrieve
177
//------------------------------------------------------------------------------
178
void Scheduler::queueRetrieve(
179
  const std::string &instanceName,
180
181
182
183
184
  const common::dataStructures::RetrieveRequest &request,
  log::LogContext & lc) {
  using utils::postEllipsis;
  using utils::midEllipsis;
  utils::Timer t;
185
  // Get the queue criteria
186
187
188
189
190
191
  common::dataStructures::RetrieveFileQueueCriteria queueCriteria;
  if(!request.isRepack){
    queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc);
  } else {
    //Repack does not need policy
    queueCriteria.archiveFile = m_catalogue.getArchiveFileById(request.archiveFileID);
192
    queueCriteria.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack;
193
  }
194
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
195
  std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc);
196
197
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
198
  spc.add("fileId", request.archiveFileID)
199
200
201
202
203
204
     .add("instanceName", instanceName)
     .add("diskFilePath", request.diskFileInfo.path)
     .add("diskFileOwner", request.diskFileInfo.owner)
     .add("diskFileGroup", request.diskFileInfo.group)
     .add("diskFileRecoveryBlob", postEllipsis(request.diskFileInfo.recoveryBlob, 20))
     .add("dstURL", request.dstURL)
205
     .add("errorReportURL", request.errorReportURL)
206
207
208
     .add("creationHost", request.creationLog.host)
     .add("creationTime", request.creationLog.time)
     .add("creationUser", request.creationLog.username)
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
     .add("requesterName", request.requester.name)
     .add("requesterGroup", request.requester.group)
     .add("criteriaArchiveFileId", queueCriteria.archiveFile.archiveFileID)
     .add("criteriaChecksumType", queueCriteria.archiveFile.checksumType)
     .add("criteriaChecksumValue", queueCriteria.archiveFile.checksumValue)
     .add("criteriaCreationTime", queueCriteria.archiveFile.creationTime)
     .add("criteriaDiskFileId", queueCriteria.archiveFile.diskFileId)
     .add("criteriaDiskFilePath", queueCriteria.archiveFile.diskFileInfo.path)
     .add("criteriaDiskFileOwner", queueCriteria.archiveFile.diskFileInfo.owner)
     .add("criteriaDiskRecoveryBlob", postEllipsis(queueCriteria.archiveFile.diskFileInfo.recoveryBlob, 20))
     .add("criteriaDiskInstance", queueCriteria.archiveFile.diskInstance)
     .add("criteriaFileSize", queueCriteria.archiveFile.fileSize)
     .add("reconciliationTime", queueCriteria.archiveFile.reconciliationTime)
     .add("storageClass", queueCriteria.archiveFile.storageClass);
  for (auto & tf:queueCriteria.archiveFile.tapeFiles) {
    std::stringstream tc;
    tc << "tapeCopy" << tf.first;
    spc.add(tc.str(), tf.second);
  }
  spc.add("selectedVid", selectedVid)
229
     .add("catalogueTime", catalogueTime)
230
     .add("schedulerDbTime", schedulerDbTime);
231
232
233
234
235
236
  if(!request.isRepack){
      spc.add("policyName", queueCriteria.mountPolicy.name)
     .add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
     .add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
     .add("policyPriority", queueCriteria.mountPolicy.retrievePriority);
  }
237
  lc.log(log::INFO, "Queued retrieve request");
238
239
}

240
241
242
243
244
245
void Scheduler::queueRetrieveRequestForRepack(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
  std::list<uint64_t> copyNbs, log::LogContext &lc)
{

}

246
//------------------------------------------------------------------------------
247
// deleteArchive
248
//------------------------------------------------------------------------------
249
250
void Scheduler::deleteArchive(const std::string &instanceName, const common::dataStructures::DeleteArchiveRequest &request, 
    log::LogContext & lc) {
251
252
253
  // We have different possible scenarios here. The file can be safe in the catalogue,
  // fully queued, or partially queued.
  // First, make sure the file is not queued anymore.
254
255
256
257
258
259
260
261
262
263
264
265
// TEMPORARILY commenting out SchedulerDatabase::deleteArchiveRequest() in order
// to reduce latency.  PLEASE NOTE however that this means files "in-flight" to
// tape will not be deleted and they will appear in the CTA catalogue when they
// are finally written to tape.
//try {
//  m_db.deleteArchiveRequest(instanceName, request.archiveFileID);
//} catch (exception::Exception &dbEx) {
//  // The file was apparently not queued. If we fail to remove it from the
//  // catalogue for any reason other than it does not exist in the catalogue,
//  // then it is an error.
//  m_catalogue.deleteArchiveFile(instanceName, request.archiveFileID);
//}
266
267
268
269
270
271
  utils::Timer t;
  m_catalogue.deleteArchiveFile(instanceName, request.archiveFileID, lc);
  auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime);
  lc.log(log::INFO, "In Scheduler::deleteArchive(): success.");
272
273
274
}

//------------------------------------------------------------------------------
275
// cancelRetrieve
276
//------------------------------------------------------------------------------
277
278
void Scheduler::cancelRetrieve(const std::string &instanceName, const common::dataStructures::CancelRetrieveRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
279
280
281
}

//------------------------------------------------------------------------------
282
// updateFileInfo
283
//------------------------------------------------------------------------------
284
285
void Scheduler::updateFileInfo(const std::string &instanceName, const common::dataStructures::UpdateFileInfoRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
286
}
287

288
//------------------------------------------------------------------------------
289
290
// updateFileStorageClass
//------------------------------------------------------------------------------
291
292
void Scheduler::updateFileStorageClass(const std::string &instanceName, const common::dataStructures::UpdateFileStorageClassRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
293
294
295
296
}

//------------------------------------------------------------------------------
// listStorageClass
297
//------------------------------------------------------------------------------
298
299
std::list<common::dataStructures::StorageClass> Scheduler::listStorageClass(const std::string &instanceName, const common::dataStructures::ListStorageClassRequest &request) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
300
301
302
}

//------------------------------------------------------------------------------
303
// labelTape
304
//------------------------------------------------------------------------------
305
void Scheduler::queueLabel(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, const bool force, const bool lbp) {
306
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
307
}
308

309
310
311
//------------------------------------------------------------------------------
// repack
//------------------------------------------------------------------------------
312
void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, 
313
    const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) {
314
315
316
317
318
319
320
321
  // Check request sanity
  if (vid.empty()) throw exception::UserError("Empty VID name.");
  if (bufferURL.empty()) throw exception::UserError("Empty buffer URL.");
  utils::Timer t;
  m_db.queueRepack(vid, bufferURL, repackType, lc);
  log::TimingList tl;
  tl.insertAndReset("schedulerDbTime", t);
  log::ScopedParamContainer params(lc);
322
  params.add("tapeVid", vid)
323
324
325
        .add("repackType", toString(repackType));
  tl.addToLog(params);
  lc.log(log::INFO, "In Scheduler::queueRepack(): success.");
326
327
328
329
330
}

//------------------------------------------------------------------------------
// cancelRepack
//------------------------------------------------------------------------------
331
332
void Scheduler::cancelRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, log::LogContext & lc) {
  m_db.cancelRepack(vid, lc);
333
334
335
336
337
}

//------------------------------------------------------------------------------
// getRepacks
//------------------------------------------------------------------------------
338
339
std::list<common::dataStructures::RepackInfo> Scheduler::getRepacks() {
  return m_db.getRepackInfo();
340
341
342
343
344
}

//------------------------------------------------------------------------------
// getRepack
//------------------------------------------------------------------------------
345
common::dataStructures::RepackInfo Scheduler::getRepack(const std::string &vid) {
346
  return m_db.getRepackInfo(vid);
347
348
}

349
350
351
352
353
354
//------------------------------------------------------------------------------
// promoteRepackRequestsToToExpand
//------------------------------------------------------------------------------
void Scheduler::promoteRepackRequestsToToExpand(log::LogContext & lc) {
  // We target 2 fresh requests available for processing (ToExpand or Starting).
  const size_t targetAvailbleRequests = 2;
Eric Cano's avatar
WIP    
Eric Cano committed
355
  // Dry-run test to check if promotion is needed.
356
  auto repackStatsNL = m_db.getRepackStatisticsNoLock();
357
358
  // Statistics are supposed to be initialized for each status value. We only try to
  // expand if there are requests available in Pending status.
359
  typedef common::dataStructures::RepackInfo::Status Status;
360
361
  if (repackStatsNL->at(Status::Pending) &&
          (targetAvailbleRequests > repackStatsNL->at(Status::ToExpand) + repackStatsNL->at(Status::Starting))) {
362
363
    // Let's try to promote a repack request. Take the lock.
    repackStatsNL.reset();
364
365
366
367
368
369
370
    decltype(m_db.getRepackStatistics()) repackStats;
    try {
      repackStats = m_db.getRepackStatistics();
    } catch (SchedulerDatabase::RepackRequestStatistics::NoPendingRequestQueue &) {
      // Nothing to promote after all.
      return;
    }
Eric Cano's avatar
Eric Cano committed
371
    if (repackStats->at(Status::Pending) &&
372
            (targetAvailbleRequests > repackStats->at(Status::ToExpand) + repackStats->at(Status::Starting))) {
373
374
375
376
377
378
379
380
381
382
      auto requestsToPromote = targetAvailbleRequests;
      requestsToPromote -= repackStats->at(Status::ToExpand);
      requestsToPromote -= repackStats->at(Status::Starting);
      auto stats = repackStats->promotePendingRequestsForExpansion(requestsToPromote, lc);
      log::ScopedParamContainer params(lc);
      params.add("promotedRequests", stats.promotedRequests)
            .add("pendingBefore", stats.pendingBefore)
            .add("toEnpandBefore", stats.toEnpandBefore)
            .add("pendingAfter", stats.pendingAfter)
            .add("toExpandAfter", stats.toExpandAfter);
383
      lc.log(log::INFO,"In Scheduler::promoteRepackRequestsToToExpand(): Promoted repack request to \"to expand\"");
384
385
386
387
    }
  }
}

388
//------------------------------------------------------------------------------
389
// getNextRepackRequestToExpand
390
//------------------------------------------------------------------------------
391
std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() {
392
    std::unique_ptr<cta::SchedulerDatabase::RepackRequest> repackRequest;
393
    repackRequest = m_db.getNextRepackJobToExpand();
394
395
396
397
398
399
    if(repackRequest != nullptr){
      std::unique_ptr<RepackRequest> ret(new RepackRequest());
      ret->m_dbReq.reset(repackRequest.release());
      return ret;
    }
    return nullptr;
400
401
}

402
403
404
405
406
407
const std::string Scheduler::generateRetrieveDstURL(const cta::common::dataStructures::DiskFileInfo dfi) const{
  std::ostringstream strStream;
  strStream<<"repack:/"<<dfi.path;
  return strStream.str();
}

408
409
410
//------------------------------------------------------------------------------
// expandRepackRequest
//------------------------------------------------------------------------------
411
void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext& lc) {
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
  uint64_t fseq = c_defaultFseqForRepack;
  std::list<common::dataStructures::ArchiveFile> files;
  auto vid = repackRequest->getRepackInfo().vid;
  while(true) {
    files = m_catalogue.getFilesForRepack(vid,fseq,c_defaultMaxNbFilesForRepack);
    for(auto &archiveFile : files)
    {
      cta::common::dataStructures::RetrieveRequest retrieveRequest;
      retrieveRequest.archiveFileID = archiveFile.archiveFileID;
      retrieveRequest.diskFileInfo = archiveFile.diskFileInfo;
      retrieveRequest.dstURL = generateRetrieveDstURL(archiveFile.diskFileInfo);
      retrieveRequest.isRepack = true;
      queueRetrieve(archiveFile.diskInstance,retrieveRequest,lc);
    }
    if (files.size()) {
      auto & tf=files.back().tapeFiles;
      fseq = std::find_if(tf.cbegin(), tf.cend(), [vid](decltype(*(tf.cbegin())) &f){ return f.second.vid == vid; })->second.fSeq + 1;
    } else break;
  }
431
432
433
}


434

435
436
437
//------------------------------------------------------------------------------
// shrink
//------------------------------------------------------------------------------
438
439
void Scheduler::shrink(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &tapepool) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
440
441
442
443
444
}

//------------------------------------------------------------------------------
// verify
//------------------------------------------------------------------------------
445
void Scheduler::queueVerify(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, const optional<uint64_t> numberOfFiles) {
446
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
447
448
449
450
451
}

//------------------------------------------------------------------------------
// cancelVerify
//------------------------------------------------------------------------------
452
453
void Scheduler::cancelVerify(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid) {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
454
455
456
457
458
}

//------------------------------------------------------------------------------
// getVerifys
//------------------------------------------------------------------------------
459
460
std::list<common::dataStructures::VerifyInfo> Scheduler::getVerifys(const common::dataStructures::SecurityIdentity &cliIdentity) const {
  return std::list<common::dataStructures::VerifyInfo>(); 
461
462
463
464
465
}

//------------------------------------------------------------------------------
// getVerify
//------------------------------------------------------------------------------
466
467
common::dataStructures::VerifyInfo Scheduler::getVerify(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid) const {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
468
469
}

470
//------------------------------------------------------------------------------
471
472
// readTest
//------------------------------------------------------------------------------
473
common::dataStructures::ReadTestResult Scheduler::readTest(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &driveName, const std::string &vid,
474
        const uint64_t firstFSeq, const uint64_t lastFSeq, const bool checkChecksum, const std::string &output) const {
475
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
476
477
478
479
480
}

//------------------------------------------------------------------------------
// writeTest
//------------------------------------------------------------------------------
481
common::dataStructures::WriteTestResult Scheduler::writeTest(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &driveName, const std::string &vid,
482
        const std::string &inputFile) const {
483
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
Daniele Kruse's avatar
Daniele Kruse committed
484
485
486
487
488
}

//------------------------------------------------------------------------------
// write_autoTest
//------------------------------------------------------------------------------
489
common::dataStructures::WriteTestResult Scheduler::write_autoTest(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &driveName, const std::string &vid,
490
        const uint64_t numberOfFiles, const uint64_t fileSize, const common::dataStructures::TestSourceType testSourceType) const {
491
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
492
493
494
}

//------------------------------------------------------------------------------
495
// getDesiredDriveState
496
//------------------------------------------------------------------------------
497
498
common::dataStructures::DesiredDriveState Scheduler::getDesiredDriveState(const std::string& driveName, log::LogContext & lc) {
  utils::Timer t;
499
  auto driveStates = m_db.getDriveStates(lc);
500
501
  for (auto & d: driveStates) {
    if (d.driveName == driveName) {
502
      auto schedulerDbTime = t.secs();
503
      log::ScopedParamContainer spc(lc);
504
505
      spc.add("drive", driveName)
         .add("schedulerDbTime", schedulerDbTime);
506
      lc.log(log::INFO, "In Scheduler::getDesiredDriveState(): success.");
507
508
509
510
511
512
513
514
515
      return d.desiredDriveState;
    }
  }
  throw NoSuchDrive ("In Scheduler::getDesiredDriveState(): no such drive");
}

//------------------------------------------------------------------------------
// setDesiredDriveState
//------------------------------------------------------------------------------
516
void Scheduler::setDesiredDriveState(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &driveName, const bool up, const bool force, log::LogContext & lc) {
517
  utils::Timer t;
518
519
520
  common::dataStructures::DesiredDriveState desiredDriveState;
  desiredDriveState.up = up;
  desiredDriveState.forceDown = force;
521
  m_db.setDesiredDriveState(driveName, desiredDriveState, lc);
522
523
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
524
525
  spc.add("drive", driveName)
     .add("up", up?"up":"down")
526
527
528
     .add("force", force?"yes":"no")
     .add("schedulerDbTime", schedulerDbTime);
   lc.log(log::INFO, "In Scheduler::setDesiredDriveState(): success.");   
529
530
}

531
532
533
534
535
536
//------------------------------------------------------------------------------
// removeDrive
//------------------------------------------------------------------------------
void Scheduler::removeDrive(const common::dataStructures::SecurityIdentity &cliIdentity, 
  const std::string &driveName, log::LogContext & lc) {
  utils::Timer t;
537
  m_db.removeDrive(driveName, lc);
538
539
540
541
542
543
544
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("drive", driveName)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::removeDrive(): success.");   
}

545
546
547
//------------------------------------------------------------------------------
// setDesiredDriveState
//------------------------------------------------------------------------------
548
void Scheduler::reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo, common::dataStructures::MountType type, common::dataStructures::DriveStatus status, log::LogContext & lc) {
549
  // TODO: mount type should be transmitted too.
550
  utils::Timer t;
551
  m_db.reportDriveStatus(driveInfo, type, status, time(NULL), lc);
552
553
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
554
555
  spc.add("drive", driveInfo.driveName)
     .add("schedulerDbTime", schedulerDbTime);
Eric Cano's avatar
Eric Cano committed
556
  lc.log(log::INFO, "In Scheduler::reportDriveStatus(): success.");
557
558
559
560
561
}

//------------------------------------------------------------------------------
// getPendingArchiveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
562
563
564
565
566
567
568
569
std::map<std::string, std::list<common::dataStructures::ArchiveJob> > Scheduler::getPendingArchiveJobs(log::LogContext & lc) const {
  utils::Timer t;
  auto ret = m_db.getArchiveJobs();
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(): success.");
  return ret;
570
571
572
573
574
}

//------------------------------------------------------------------------------
// getPendingArchiveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
575
576
std::list<common::dataStructures::ArchiveJob> Scheduler::getPendingArchiveJobs(const std::string &tapePoolName, log::LogContext & lc) const {
  utils::Timer t;
577
  if(!m_catalogue.tapePoolExists(tapePoolName)) {
578
    throw exception::UserError(std::string("Tape pool ") + tapePoolName + " does not exist");
579
  }
Eric Cano's avatar
Eric Cano committed
580
581
582
583
584
585
586
587
  auto catalogueTime = t.secs(utils::Timer::resetCounter);
  auto ret = m_db.getArchiveJobs(tapePoolName);
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("catalogueTime", catalogueTime)
     .add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(tapePool): success.");
  return ret;
588
589
590
591
592
}

//------------------------------------------------------------------------------
// getPendingRetrieveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
593
594
595
596
597
598
599
600
std::map<std::string, std::list<common::dataStructures::RetrieveJob> > Scheduler::getPendingRetrieveJobs(log::LogContext & lc) const {
  utils::Timer t;
  auto ret =  m_db.getRetrieveJobs();
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingRetrieveJobs(): success.");
  return ret;
601
602
603
604
605
}

//------------------------------------------------------------------------------
// getPendingRetrieveJobs
//------------------------------------------------------------------------------
Eric Cano's avatar
Eric Cano committed
606
std::list<common::dataStructures::RetrieveJob> Scheduler::getPendingRetrieveJobs(const std::string& vid, log::LogContext &lc) const {
607
608
609
610
611
612
613
  utils::Timer t;
  auto ret =  m_db.getRetrieveJobs(vid);
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getPendingRetrieveJobs(): success.");
  return ret;
614
615
}

616
617
618
//------------------------------------------------------------------------------
// getDriveStates
//------------------------------------------------------------------------------
619
620
std::list<common::dataStructures::DriveState> Scheduler::getDriveStates(const common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc) const {
  utils::Timer t;
621
  auto ret = m_db.getDriveStates(lc);
622
623
624
625
626
  auto schedulerDbTime = t.secs();
  log::ScopedParamContainer spc(lc);
  spc.add("schedulerDbTime", schedulerDbTime);
  lc.log(log::INFO, "In Scheduler::getDriveStates(): success.");
  return ret;
627
}
628
629

//------------------------------------------------------------------------------
630
// sortAndGetTapesForMountInfo
631
//------------------------------------------------------------------------------
632
633
634
635
void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>& mountInfo,
    const std::string & logicalLibraryName, const std::string & driveName, utils::Timer & timer,
    std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList,
    double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc) {
636
637
638
639
640
  // The library information is not know for the tapes involved in retrieves. We 
  // need to query the catalogue now about all those tapes.
  // Build the list of tapes.
  std::set<std::string> tapeSet;
  for (auto &m:mountInfo->potentialMounts) {
641
    if (m.type==common::dataStructures::MountType::Retrieve) tapeSet.insert(m.vid);
642
643
644
  }
  if (tapeSet.size()) {
    auto tapesInfo=m_catalogue.getTapesByVid(tapeSet);
645
    getTapeInfoTime = timer.secs(utils::Timer::resetCounter);
646
    for (auto &m:mountInfo->potentialMounts) {
647
      if (m.type==common::dataStructures::MountType::Retrieve) {
648
649
        m.logicalLibrary=tapesInfo[m.vid].logicalLibraryName;
        m.tapePool=tapesInfo[m.vid].tapePoolName;
650
651
652
653
        m.vendor = tapesInfo[m.vid].vendor;
        m.mediaType = tapesInfo[m.vid].mediaType;
        m.vo = tapesInfo[m.vid].vo;
        m.capacityInBytes = tapesInfo[m.vid].capacityInBytes;
654
655
656
657
      }
    }
  }
  
658
659
660
661
662
  // We should now filter the potential mounts to keep only the ones we are
  // compatible with (match the logical library for retrieves).
  // We also only want the potential mounts for which we still have 
  // We cannot filter the archives yet
  for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
663
    if (m->type == common::dataStructures::MountType::Retrieve && m->logicalLibrary != logicalLibraryName) {
664
665
666
667
668
669
670
671
      m = mountInfo->potentialMounts.erase(m);
    } else {
      m++;
    }
  }
  
  // With the existing mount list, we can now populate the potential mount list
  // with the per tape pool existing mount statistics.
672
  for (auto & em: mountInfo->existingOrNextMounts) {
673
674
675
676
677
678
679
    // If a mount is still listed for our own drive, it is a leftover that we disregard.
    if (em.driveName!=driveName) {
      try {
        existingMountsSummary.at(tpType(em.tapePool, em.type))++;
      } catch (std::out_of_range &) {
        existingMountsSummary[tpType(em.tapePool, em.type)] = 1;
      }
680
681
682
      if (em.vid.size()) {
        tapesInUse.insert(em.vid);
        log::ScopedParamContainer params(lc);
683
        params.add("tapeVid", em.vid)
Eric Cano's avatar
Eric Cano committed
684
685
              .add("mountType", common::dataStructures::toString(em.type))
              .add("drive", em.driveName);
686
        lc.log(log::DEBUG,"In Scheduler::sortAndGetTapesForMountInfo(): tapeAlreadyInUse found.");
687
      }
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
    }
  }
  
  // We can now filter out the potential mounts for which their mount criteria
  // is already met, filter out the potential mounts for which the maximum mount
  // quota is already reached, and weight the remaining by how much of their quota 
  // is reached
  for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
    // Get summary data
    uint32_t existingMounts;
    try {
      existingMounts = existingMountsSummary.at(tpType(m->tapePool, m->type));
    } catch (std::out_of_range &) {
      existingMounts = 0;
    } 
703
704
    uint32_t effectiveExistingMounts = 0;
    if (m->type == common::dataStructures::MountType::Archive) effectiveExistingMounts = existingMounts;
705
    bool mountPassesACriteria = false;
706
707
    
    if (m->bytesQueued / (1 + effectiveExistingMounts) >= m_minBytesToWarrantAMount)
708
      mountPassesACriteria = true;
709
    if (m->filesQueued / (1 + effectiveExistingMounts) >= m_minFilesToWarrantAMount)
710
      mountPassesACriteria = true;
711
    if (!effectiveExistingMounts && ((time(NULL) - m->oldestJobStartTime) > m->minRequestAge))
712
      mountPassesACriteria = true;
713
    if (!mountPassesACriteria || existingMounts >= m->maxDrivesAllowed) {
714
      log::ScopedParamContainer params(lc);
715
      params.add("tapePool", m->tapePool);
716
      if ( m->type == common::dataStructures::MountType::Retrieve) {
717
        params.add("tapeVid", m->vid);
718
719
      }
      params.add("mountType", common::dataStructures::toString(m->type))
720
721
722
723
724
725
            .add("existingMounts", existingMounts)
            .add("bytesQueued", m->bytesQueued)
            .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
            .add("filesQueued", m->filesQueued)
            .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
            .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
726
            .add("minArchiveRequestAge", m->minRequestAge)
Eric Cano's avatar
Eric Cano committed
727
728
            .add("existingMounts", existingMounts)
            .add("maxDrivesAllowed", m->maxDrivesAllowed);
729
      lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Removing potential mount not passing criteria");
730
731
732
      m = mountInfo->potentialMounts.erase(m);
    } else {
      // populate the mount with a weight 
733
      m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->maxDrivesAllowed;
Eric Cano's avatar
Eric Cano committed
734
      log::ScopedParamContainer params(lc);
735
      params.add("tapePool", m->tapePool);
736
      if ( m->type == common::dataStructures::MountType::Retrieve) {
737
        params.add("tapeVid", m->vid);
738
739
      }
      params.add("mountType", common::dataStructures::toString(m->type))
Eric Cano's avatar
Eric Cano committed
740
741
742
743
744
745
            .add("existingMounts", existingMounts)
            .add("bytesQueued", m->bytesQueued)
            .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
            .add("filesQueued", m->filesQueued)
            .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
            .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
746
            .add("minArchiveRequestAge", m->minRequestAge)
Eric Cano's avatar
Eric Cano committed
747
748
749
            .add("existingMounts", existingMounts)
            .add("maxDrivesAllowed", m->maxDrivesAllowed)
            .add("ratioOfMountQuotaUsed", m->ratioOfMountQuotaUsed);
750
      lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Will consider potential mount");
751
752
753
754
755
756
757
758
759
760
      m++;
   }
  }
  
  // We can now sort the potential mounts in decreasing priority order. 
  // The ordering is defined in operator <.
  // We want the result in descending order or priority so we reverse the vector
  std::sort(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
  std::reverse(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
  
761
762
  candidateSortingTime = timer.secs(utils::Timer::resetCounter);
  
763
764
765
766
  // Find out if we have any potential archive mount in the list. If so, get the
  // list of tapes from the catalogue.
  if (std::count_if(
        mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(), 
767
        [](decltype(*mountInfo->potentialMounts.cbegin())& m){ return m.type == common::dataStructures::MountType::Archive; } )) {
768
    tapeList = m_catalogue.getTapesForWriting(logicalLibraryName);
769
    getTapeForWriteTime = timer.secs(utils::Timer::resetCounter);
770
  }
771
        
772
773
774
775
776
777
778
779
780
  // Remove from the tape list the ones already or soon to be mounted
  auto t=tapeList.begin();
  while (t!=tapeList.end()) {
    if (tapesInUse.count(t->vid)) {
      t=tapeList.erase(t);
    } else {
      t++;
    }
  }
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
}

//------------------------------------------------------------------------------
// getNextMountDryRun
//------------------------------------------------------------------------------
bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const std::string& driveName, log::LogContext& lc) {
  // We run the same algorithm as the actual getNextMount without the global lock
  // For this reason, we just return true as soon as valid mount has been found.
  utils::Timer timer;
  double getMountInfoTime = 0;
  double getTapeInfoTime = 0;
  double candidateSortingTime = 0;
  double getTapeForWriteTime = 0;
  double decisionTime = 0;
  double schedulerDbTime = 0;
  double catalogueTime = 0;
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
798
  mountInfo = m_db.getMountInfoNoLock(lc);
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
  getMountInfoTime = timer.secs(utils::Timer::resetCounter);
  std::map<tpType, uint32_t> existingMountsSummary;
  std::set<std::string> tapesInUse;
  std::list<catalogue::TapeForWriting> tapeList;
  
  sortAndGetTapesForMountInfo(mountInfo, logicalLibraryName, driveName, timer,
      existingMountsSummary, tapesInUse, tapeList,
      getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc);
  
  // We can now simply iterate on the candidates until we manage to find a valid mount
  for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
    // If the mount is an archive, we still have to find a tape.
    if (m->type==common::dataStructures::MountType::Archive) {
      // We need to find a tape for archiving. It should be both in the right 
      // tape pool and in the drive's logical library
      // The first tape matching will go for a prototype.
      // TODO: improve to reuse already partially written tapes and randomization
      for (auto & t: tapeList) {
        if (t.tapePool == m->tapePool) {
          // We have our tape. That's enough.
          decisionTime += timer.secs(utils::Timer::resetCounter);
          schedulerDbTime = getMountInfoTime;
          catalogueTime = getTapeInfoTime + getTapeForWriteTime;
          uint32_t existingMounts = 0;
          try {
            existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type));
          } catch (...) {}
          log::ScopedParamContainer params(lc);
827
          params.add("tapePool", m->tapePool)
828
                .add("tapeVid", t.vid)
829
830
831
832
833
834
835
                .add("mountType", common::dataStructures::toString(m->type))
                .add("existingMounts", existingMounts)
                .add("bytesQueued", m->bytesQueued)
                .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
                .add("filesQueued", m->filesQueued)
                .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
                .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
836
                .add("minArchiveRequestAge", m->minRequestAge)
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
                .add("getMountInfoTime", getMountInfoTime)
                .add("getTapeInfoTime", getTapeInfoTime)
                .add("candidateSortingTime", candidateSortingTime)
                .add("getTapeForWriteTime", getTapeForWriteTime)
                .add("decisionTime", decisionTime)
                .add("schedulerDbTime", schedulerDbTime)
                .add("catalogueTime", catalogueTime);
          lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): Found a potential mount (archive)");
          return true;
        }
      }
    } else if (m->type==common::dataStructures::MountType::Retrieve) {
      // We know the tape we intend to mount. We have to validate the tape is 
      // actually available to read (not mounted or about to be mounted, and pass 
      // on it if so).
      if (tapesInUse.count(m->vid)) continue;
      decisionTime += timer.secs(utils::Timer::resetCounter);
      log::ScopedParamContainer params(lc);
      uint32_t existingMounts = 0;
      try {
        existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type));
      } catch (...) {}
      schedulerDbTime = getMountInfoTime;
      catalogueTime = getTapeInfoTime + getTapeForWriteTime;
861
      params.add("tapePool", m->tapePool)
862
            .add("tapeVid", m->vid)
863
864
865
866
867
868
869
            .add("mountType", common::dataStructures::toString(m->type))
            .add("existingMounts", existingMounts)
            .add("bytesQueued", m->bytesQueued)
            .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
            .add("filesQueued", m->filesQueued)
            .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
            .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
870
            .add("minArchiveRequestAge", m->minRequestAge)
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
            .add("getMountInfoTime", getMountInfoTime)
            .add("getTapeInfoTime", getTapeInfoTime)
            .add("candidateSortingTime", candidateSortingTime)
            .add("getTapeForWriteTime", getTapeForWriteTime)
            .add("decisionTime", decisionTime)
            .add("schedulerDbTime", schedulerDbTime)
            .add("catalogueTime", catalogueTime);
      lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): Found a potential mount (retrieve)");
      return true;
    }
  }
  schedulerDbTime = getMountInfoTime;
  catalogueTime = getTapeInfoTime + getTapeForWriteTime;
  decisionTime += timer.secs(utils::Timer::resetCounter);
  log::ScopedParamContainer params(lc);
  params.add("getMountInfoTime", getMountInfoTime)
        .add("getTapeInfoTime", getTapeInfoTime)
        .add("candidateSortingTime", candidateSortingTime)
        .add("getTapeForWriteTime", getTapeForWriteTime)
        .add("decisionTime", decisionTime)
        .add("schedulerDbTime", schedulerDbTime)
        .add("catalogueTime", catalogueTime);
893
  lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): No valid mount found.");
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
  return false;
}


//------------------------------------------------------------------------------
// getNextMount
//------------------------------------------------------------------------------
std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc) {
  // In order to decide the next mount to do, we have to take a global lock on 
  // the scheduling, retrieve a list of all running mounts, queues sizes for 
  // tapes and tape pools, order the candidates by priority
  // below threshold, and pick one at a time, we then attempt to get a tape 
  // from the catalogue (for the archive mounts), and walk the list until we
  // mount or find nothing to do.
  // We then skip to the next candidate, until we find a suitable one and
  // return the mount, or exhaust all of them an 
  // Many steps for this logic are not specific for the database and are hence
  // implemented in the scheduler itself.
  // First, get the mount-related info from the DB
  utils::Timer timer;
  double getMountInfoTime = 0;
  double queueTrimingTime = 0;
  double getTapeInfoTime = 0;
  double candidateSortingTime = 0;
  double getTapeForWriteTime = 0;
  double decisionTime = 0;
  double mountCreationTime = 0;
  double driveStatusSetTime = 0;
  double schedulerDbTime = 0;
  double catalogueTime = 0;
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
  mountInfo = m_db.getMountInfo(lc);
  getMountInfoTime = timer.secs(utils::Timer::resetCounter);
  if (mountInfo->queueTrimRequired) {
    m_db.trimEmptyQueues(lc);
    queueTrimingTime = timer.secs(utils::Timer::resetCounter);
  }
  __attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo;
  
  std::map<tpType, uint32_t> existingMountsSummary;
  std::set<std::string> tapesInUse;
  std::list<catalogue::TapeForWriting> tapeList;
  
  sortAndGetTapesForMountInfo(mountInfo, logicalLibraryName, driveName, timer,
      existingMountsSummary, tapesInUse, tapeList,
      getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc);
940

941
942
943
944
  // We can now simply iterate on the candidates until we manage to create a
  // mount for one of them
  for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
    // If the mount is an archive, we still have to find a tape.
945
    if (m->type==common::dataStructures::MountType::Archive) {
946
947
948
      // We need to find a tape for archiving. It should be both in the right 
      // tape pool and in the drive's logical library
      // The first tape matching will go for a prototype.
949
950
951
      // TODO: improve to reuse already partially written tapes and randomization
      for (auto & t: tapeList) {
        if (t.tapePool == m->tapePool) {
952
953
          // We have our tape. Try to create the session. Prepare a return value
          // for it.
954
          std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount(m_catalogue));
955
956
          // Get the db side of the session
          try {
957
            decisionTime += timer.secs(utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
958
            internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t,
959
960
                driveName, 
                logicalLibraryName, 
961
962
963
964
965
                utils::getShortHostname(),
                t.vo,
                t.mediaType,
                t.vendor,
                t.capacityInBytes,
966
                time(NULL)).release());
967
            mountCreationTime += timer.secs(utils::Timer::resetCounter);
968
            internalRet->m_sessionRunning = true;
969
            internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting);
970
            driveStatusSetTime += timer.secs(utils::Timer::resetCounter);
971
972
973
974
975
            log::ScopedParamContainer params(lc);
            uint32_t existingMounts = 0;
            try {
              existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type));
            } catch (...) {}
976
977
            schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime;
            catalogueTime = getTapeInfoTime + getTapeForWriteTime;
978
            
979
            params.add("tapePool", m->tapePool)
980
                  .add("tapeVid", t.vid)
981
982
983
                  .add("vo",t.vo)
                  .add("mediaType",t.mediaType)
                  .add("vendor",t.vendor)
984
985
986
987
988
989
990
                  .add("mountType", common::dataStructures::toString(m->type))
                  .add("existingMounts", existingMounts)
                  .add("bytesQueued", m->bytesQueued)
                  .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
                  .add("filesQueued", m->filesQueued)
                  .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
                  .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
991
                  .add("minArchiveRequestAge", m->minRequestAge)
992
993
994
995
996
997
998
999
1000
                  .add("getMountInfoTime", getMountInfoTime)
                  .add("queueTrimingTime", queueTrimingTime)
                  .add("getTapeInfoTime", getTapeInfoTime)
                  .add("candidateSortingTime", candidateSortingTime)
                  .add("getTapeForWriteTime", getTapeForWriteTime)
                  .add("decisionTime", decisionTime)
                  .add("mountCreationTime", mountCreationTime)
                  .add("driveStatusSetTime", driveStatusSetTime)
                  .add("schedulerDbTime", schedulerDbTime)
For faster browsing, not all history is shown. View entire blame