OStoreDB.cpp 277 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#include "common/log/StdoutLogger.hpp"

21
#include "OStoreDB.hpp"
22
#include "MemQueues.hpp"
23
#include "objectstore/ArchiveQueueAlgorithms.hpp"
24
#include "objectstore/RepackQueueAlgorithms.hpp"
25
#include "objectstore/DriveRegister.hpp"
26
#include "objectstore/DriveState.hpp"
27
28
#include "objectstore/RepackRequest.hpp"
#include "objectstore/RepackIndex.hpp"
Eric Cano's avatar
Eric Cano committed
29
#include "objectstore/RepackQueue.hpp"
30
#include "objectstore/Sorter.hpp"
31
#include "objectstore/Helpers.hpp"
32
#include "common/exception/Exception.hpp"
33
#include "common/exception/UserError.hpp"
34
#include "common/utils/utils.hpp"
35
#include "scheduler/LogicalLibrary.hpp"
36
#include "common/dataStructures/MountPolicy.hpp"
37
#include "common/make_unique.hpp"
38
#include "tapeserver/castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
39
#include "Scheduler.hpp"
40
#include "disk/DiskFile.hpp"
41
#include <algorithm>
42
#include <cmath>
43
#include <numeric>      /* std::accumulate */
44
45
#include <stdlib.h>     /* srand, rand */
#include <time.h>       /* time */
46
#include <stdexcept>
47
#include <set>
48
#include <iostream>
49
#include <bits/unique_ptr.h>
50
#include "common/utils/utils.hpp"
51
#include "objectstore/AgentWrapper.hpp"
52

53
namespace cta {  
54
55
using namespace objectstore;

56
57
58
//------------------------------------------------------------------------------
// OStoreDB::OStoreDB()
//------------------------------------------------------------------------------
59
OStoreDB::OStoreDB(objectstore::Backend& be, catalogue::Catalogue & catalogue, log::Logger &logger):
60
61
  m_taskQueueSize(0), m_taskPostingSemaphore(5), m_objectStore(be), m_catalogue(catalogue), m_logger(logger) {
  for (size_t i=0; i<5; i++) {
62
63
64
65
    m_enqueueingWorkerThreads.emplace_back(new EnqueueingWorkerThread(m_enqueueingTasksQueue));
    m_enqueueingWorkerThreads.back()->start();
  }
}
66

67
68
69
//------------------------------------------------------------------------------
// OStoreDB::~OStoreDB()
//------------------------------------------------------------------------------
70
OStoreDB::~OStoreDB() throw() {
Eric Cano's avatar
Eric Cano committed
71
  while (m_taskQueueSize) sleep(1);
72
73
74
75
76
77
  for (__attribute__((unused)) auto &t: m_enqueueingWorkerThreads) m_enqueueingTasksQueue.push(nullptr);
  for (auto &t: m_enqueueingWorkerThreads) {
    t->wait();
    delete t;
    t=nullptr;
  }
78
}
79

80
81
82
//------------------------------------------------------------------------------
// OStoreDB::setAgentReference()
//------------------------------------------------------------------------------
83
84
85
void OStoreDB::setAgentReference(objectstore::AgentReference *agentReference) {
  m_agentReference = agentReference;
}
86

87
88
89
//------------------------------------------------------------------------------
// OStoreDB::assertAgentAddressSet()
//------------------------------------------------------------------------------
90
91
92
void OStoreDB::assertAgentAddressSet() {
  if (!m_agentReference)
    throw AgentNotSet("In OStoreDB::assertAgentSet: Agent address not set");
93
}
94

95
96
97
98
//------------------------------------------------------------------------------
// OStoreDB::waitSubthreadsComplete()
//------------------------------------------------------------------------------
void OStoreDB::waitSubthreadsComplete() {
99
100
  // This method is only used by unit tests so calling usleep() is good enough
  while (m_taskQueueSize) ::usleep(1000);
101
102
}

103
//------------------------------------------------------------------------------
104
// OStoreDB::setThreadNumber()
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//------------------------------------------------------------------------------
void OStoreDB::setThreadNumber(uint64_t threadNumber) {
  // Clear all threads.
  for (__attribute__((unused)) auto &t: m_enqueueingWorkerThreads) m_enqueueingTasksQueue.push(nullptr);
  for (auto &t: m_enqueueingWorkerThreads) {
    t->wait();
    delete t;
    t=nullptr;
  }
  m_enqueueingWorkerThreads.clear();
  // Create the new ones.
  for (size_t i=0; i<threadNumber; i++) {
    m_enqueueingWorkerThreads.emplace_back(new EnqueueingWorkerThread(m_enqueueingTasksQueue));
    m_enqueueingWorkerThreads.back()->start();
  }
}

122
123
124
125
//------------------------------------------------------------------------------
// OStoreDB::setBottomHalfQueueSize()
//------------------------------------------------------------------------------
void OStoreDB::setBottomHalfQueueSize(uint64_t tasksNumber) {
126
127
128
129
130
  // 5 is the default queue size.
  m_taskPostingSemaphore.release(tasksNumber - 5);
}


131
132
133
134
135
136
137
//------------------------------------------------------------------------------
// OStoreDB::EnqueueingWorkerThread::run()
//------------------------------------------------------------------------------
void OStoreDB::EnqueueingWorkerThread::run() {
  while (true) {
    std::unique_ptr<EnqueueingTask> et(m_enqueueingTasksQueue.pop());
    if (!et.get()) break;
138
    ANNOTATE_HAPPENS_AFTER(et.get());
139
    (*et)();
140
    ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(et.get());
141
142
143
  }
}

144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//------------------------------------------------------------------------------
// OStoreDB::delayIfNecessary()
//------------------------------------------------------------------------------
void OStoreDB::delayIfNecessary(log::LogContext& lc) {
  bool delayInserted = false;
  utils::Timer t;
  uint64_t taskQueueSize = m_taskQueueSize;
  double sleepDelay = 0;
  double lockDelay = 0;
  if (m_taskQueueSize > 10000) {
    // Put a linear delay starting at 0 at 10000 element and going to 10s at 25000
    double delay = 10.0 * (taskQueueSize - 10000) / 15000;
    double delayInt;
    ::timespec ts;
    ts.tv_nsec = std::modf(delay, &delayInt) * 1000 * 1000 *1000;
159
    ts.tv_sec = delayInt;
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    nanosleep(&ts, nullptr);
    sleepDelay = t.secs(utils::Timer::resetCounter);
    delayInserted = true;
  }
  if (!m_taskPostingSemaphore.tryAcquire()) {
    m_taskPostingSemaphore.acquire();
    lockDelay = t.secs(utils::Timer::resetCounter);
    delayInserted = true;
  }
  if (delayInserted) {
    log::ScopedParamContainer params(lc);
    params.add("sleepDelay", sleepDelay)
          .add("lockDelay", lockDelay)
          .add("taskQueueSize", taskQueueSize);
    lc.log(log::INFO, "In OStoreDB::delayIfNecessary(): inserted delay.");
  }
}
177

178
179
180
//------------------------------------------------------------------------------
// OStoreDB::ping()
//------------------------------------------------------------------------------
181
void OStoreDB::ping() {
182
  // Validate we can lock and fetch the root entry.
183
  objectstore::RootEntry re(m_objectStore);
184
  re.fetchNoLock();
185
186
}

187
//------------------------------------------------------------------------------
188
// OStoreDB::fetchMountInfo()
189
//------------------------------------------------------------------------------
190
void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, RootEntry& re, SchedulerDatabase::PurposeGetMountInfo purpose,
191
    log::LogContext & logContext) {
192
  utils::Timer t, t2;
193
  std::list<common::dataStructures::MountPolicy> mountPolicies = m_catalogue.getCachedMountPolicies();
Eric Cano's avatar
Eric Cano committed
194
  // Walk the archive queues for USER for statistics
195
  for (auto & aqp: re.dumpArchiveQueues(JobQueueType::JobsToTransferForUser)) {
196
    objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore);
197
    // debug utility variable
198
    std::string __attribute__((__unused__)) poolName = aqp.tapePool;
199
    objectstore::ScopedSharedLock aqlock;
200
201
    double queueLockTime = 0;
    double queueFetchTime = 0;
202
    try {
203
      aqueue.fetchNoLock();
204
      queueFetchTime = t.secs(utils::Timer::resetCounter);
205
    } catch (cta::exception::Exception &ex) {
206
      log::ScopedParamContainer params (logContext);
207
      params.add("queueObject", aqp.address)
208
            .add("tapePool", aqp.tapePool)
209
            .add("exceptionMessage", ex.getMessageValue());
210
      logContext.log(log::DEBUG, "WARNING: In OStoreDB::fetchMountInfo(): failed to lock/fetch an archive queue for user. Skipping it.");
211
      continue;
212
    }
213
214
    // If there are files queued, we create an entry for this tape pool in the
    // mount candidates list.
215
216
    cta::objectstore::ArchiveQueue::JobsSummary aqueueJobsSummary = aqueue.getJobsSummary();
    if (aqueueJobsSummary.jobs) {
217
      tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
218
      auto & m = tmdi.potentialMounts.back();
219
      m.tapePool = aqp.tapePool;
220
      m.type = cta::common::dataStructures::MountType::ArchiveForUser;
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
      m.bytesQueued = aqueueJobsSummary.bytes;
      m.filesQueued = aqueueJobsSummary.jobs;      
      m.oldestJobStartTime = aqueueJobsSummary.oldestJobStartTime;
      //By default, we get the mountPolicies from the objectstore's queue counters
      m.priority = aqueueJobsSummary.priority;
      m.maxDrivesAllowed = aqueueJobsSummary.maxDrivesAllowed;
      m.minRequestAge = aqueueJobsSummary.minArchiveRequestAge;
      //If there are mount policies in the Catalogue
      if(mountPolicies.size()) {
        //We get all the mount policies that are on the queue from the catalogue list
        auto mountPoliciesInQueueList = getMountPoliciesInQueue(mountPolicies,aqueueJobsSummary.mountPolicyCountMap);
        //If an operator removed the queue mountPolicies from the catalogue, we will have no results...
        if(mountPoliciesInQueueList.size()){
          auto mountPolicyToUse = createBestArchiveMountPolicy(mountPoliciesInQueueList);
          m.priority = mountPolicyToUse.archivePriority;
          m.maxDrivesAllowed = mountPolicyToUse.maxDrivesAllowed;
          m.minRequestAge = mountPolicyToUse.archiveMinRequestAge;
        }
      }
240
      m.logicalLibrary = "";
241
242
    } else {
      tmdi.queueTrimRequired = true;
243
    }
244
245
246
    auto processingTime = t.secs(utils::Timer::resetCounter);
    log::ScopedParamContainer params (logContext);
    params.add("queueObject", aqp.address)
247
          .add("tapePool", aqp.tapePool)
248
          .add("queueType", toString(cta::common::dataStructures::MountType::ArchiveForUser))
249
250
          .add("queueLockTime", queueLockTime)
          .add("queueFetchTime", queueFetchTime)
251
          .add("processingTime", processingTime);
252
253
254
    if(queueLockTime > 1 || queueFetchTime > 1) {
      logContext.log(log::WARNING, "In OStoreDB::fetchMountInfo(): fetched an archive for user queue and that lasted more than 1 second.");
    }
255
  }
Eric Cano's avatar
Eric Cano committed
256
  // Walk the archive queues for REPACK for statistics
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
  for (auto & aqp: re.dumpArchiveQueues(JobQueueType::JobsToTransferForRepack)) {
    objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore);
    // debug utility variable
    std::string __attribute__((__unused__)) poolName = aqp.tapePool;
    objectstore::ScopedSharedLock aqlock;
    double queueLockTime = 0;
    double queueFetchTime = 0;
    try {
      aqueue.fetchNoLock();
      queueFetchTime = t.secs(utils::Timer::resetCounter);
    } catch (cta::exception::Exception &ex) {
      log::ScopedParamContainer params (logContext);
      params.add("queueObject", aqp.address)
            .add("tapePool", aqp.tapePool)
            .add("exceptionMessage", ex.getMessageValue());
272
      logContext.log(log::DEBUG, "WARNING: In OStoreDB::fetchMountInfo(): failed to lock/fetch an archive queue for repack. Skipping it.");
273
274
275
276
      continue;
    }
    // If there are files queued, we create an entry for this tape pool in the
    // mount candidates list.
277
278
    cta::objectstore::ArchiveQueue::JobsSummary aqueueRepackJobsSummary = aqueue.getJobsSummary();
    if (aqueueRepackJobsSummary.jobs) {
279
280
281
282
      tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
      auto & m = tmdi.potentialMounts.back();
      m.tapePool = aqp.tapePool;
      m.type = cta::common::dataStructures::MountType::ArchiveForRepack;
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
      m.bytesQueued = aqueueRepackJobsSummary.bytes;
      m.filesQueued = aqueueRepackJobsSummary.jobs;      
      m.oldestJobStartTime = aqueueRepackJobsSummary.oldestJobStartTime;
      m.priority = aqueueRepackJobsSummary.priority;
      m.maxDrivesAllowed = aqueueRepackJobsSummary.maxDrivesAllowed;
      m.minRequestAge = aqueueRepackJobsSummary.minArchiveRequestAge;
      //If there are mount policies in the Catalogue
      if(mountPolicies.size()) {
        //We get all the mount policies that are on the queue from the catalogue list
        auto mountPoliciesInQueueList = getMountPoliciesInQueue(mountPolicies,aqueueRepackJobsSummary.mountPolicyCountMap);
        //If an operator removed the queue mountPolicies from the catalogue, we will have no results...
        if(mountPoliciesInQueueList.size()){
          auto mountPolicyToUse = createBestArchiveMountPolicy(mountPoliciesInQueueList);
          m.priority = mountPolicyToUse.archivePriority;
          m.maxDrivesAllowed = mountPolicyToUse.maxDrivesAllowed;
          m.minRequestAge = mountPolicyToUse.archiveMinRequestAge;
        }
      }
301
302
303
304
305
306
307
308
309
310
311
312
      m.logicalLibrary = "";
    } else {
      tmdi.queueTrimRequired = true;
    }
    auto processingTime = t.secs(utils::Timer::resetCounter);
    log::ScopedParamContainer params (logContext);
    params.add("queueObject", aqp.address)
          .add("tapePool", aqp.tapePool)
          .add("queueType", toString(cta::common::dataStructures::MountType::ArchiveForRepack))
          .add("queueLockTime", queueLockTime)
          .add("queueFetchTime", queueFetchTime)
          .add("processingTime", processingTime);
313
314
315
    if(queueLockTime > 1 || queueFetchTime > 1) {
      logContext.log(log::WARNING, "In OStoreDB::fetchMountInfo(): fetched an archive for repack queue and that lasted more than 1 second.");
    }
316
  }
317
  // Walk the retrieve queues for statistics
318
  for (auto & rqp: re.dumpRetrieveQueues(JobQueueType::JobsToTransferForUser)) {
319
320
321
    RetrieveQueue rqueue(rqp.address, m_objectStore);
    // debug utility variable
    std::string __attribute__((__unused__)) vid = rqp.vid;
322
    ScopedSharedLock rqlock;
323
324
    double queueLockTime = 0;
    double queueFetchTime = 0;
325
    try {
326
      rqueue.fetchNoLock();
327
      queueFetchTime = t.secs(utils::Timer::resetCounter);
328
329
330
331
    } catch (cta::exception::Exception &ex) {
      log::LogContext lc(m_logger);
      log::ScopedParamContainer params (lc);
      params.add("queueObject", rqp.address)
332
            .add("tapeVid", rqp.vid)
333
            .add("exceptionMessage", ex.getMessageValue());
334
      lc.log(log::DEBUG, "WARNING: In OStoreDB::fetchMountInfo(): failed to lock/fetch a retrieve queue. Skipping it.");
335
      continue;
336
    }
337
338
    // If there are files queued, we create an entry for this retrieve queue in the
    // mount candidates list.
Eric Cano's avatar
Eric Cano committed
339
    auto rqSummary = rqueue.getJobsSummary();
340
341
342
343
    bool isPotentialMount = false;
    auto vidToTapeMap = m_catalogue.getTapesByVid({rqp.vid});
    if(vidToTapeMap.at(rqp.vid).disabled){
      //Check if there are Repack Retrieve requests with forceDisabledTape flag in the queue
344
345
346
      auto retrieveQueueJobs = rqueue.dumpJobs();
      uint64_t nbJobsNotExistInQueue = 0;
      for(auto &job: retrieveQueueJobs){
347
        cta::objectstore::RetrieveRequest rr(job.address,this->m_objectStore);
348
349
350
351
352
353
354
355
356
357
358
        try{
          rr.fetchNoLock();
          if(rr.getRepackInfo().forceDisabledTape){
            //At least one Retrieve job is a Repack Retrieve job with the tape disabled flag,
            //we have a potential mount.
            isPotentialMount = true;
            break;
          }
        } catch(const cta::objectstore::Backend::NoSuchObject & ex){
          //In the case of a repack cancellation, the RetrieveRequest object is deleted, so we just ignore the exception
          //it will not be a potential mount.
359
          nbJobsNotExistInQueue++;
360
361
        }
      }
362
363
364
365
366
367
      if(!isPotentialMount && nbJobsNotExistInQueue == retrieveQueueJobs.size()){
        //The tape is disabled, there are only jobs that have been deleted, it is a potential mount as we want to flush the queue.
        //If there is at least one job that is in the queue, and is not a repack with the --disabledtape flag,
        //the jobs have to stay in the queue as long as the tape is disabled.
        isPotentialMount = true;
      }
368
369
370
    } else {
      isPotentialMount = true;
    }
371
    if (rqSummary.jobs && (isPotentialMount || purpose == SchedulerDatabase::PurposeGetMountInfo::SHOW_QUEUES)) {
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
      //Getting the default mountPolicies parameters from the queue summary
      uint64_t maxDrivesAllowed = rqSummary.maxDrivesAllowed;
      uint64_t minRetrieveRequestAge = rqSummary.minRetrieveRequestAge;
      uint64_t priority = rqSummary.priority;
      //Try to get the last values of the mountPolicies from the ones in the Catalogue
      if(mountPolicies.size()){
        auto mountPoliciesInQueueList = getMountPoliciesInQueue(mountPolicies,rqSummary.mountPolicyCountMap);
        if(mountPoliciesInQueueList.size()){
          //We need to get the most advantageous mountPolicy
          //As the Init element of the reduce function is the first element of the list, we start the reduce with the second element (++mountPolicyInQueueList.begin())
          common::dataStructures::MountPolicy mountPolicyToUse = createBestRetrieveMountPolicy(mountPoliciesInQueueList);
          priority = mountPolicyToUse.retrievePriority;
          maxDrivesAllowed = mountPolicyToUse.maxDrivesAllowed;
          minRetrieveRequestAge = mountPolicyToUse.retrieveMinRequestAge;
        }
      }
Eric Cano's avatar
Eric Cano committed
388
389
390
391
392
393
394
395
396
397
398
      // Check if we have activities and if all the jobs are covered by one or not (possible mixed case).
      bool jobsWithoutActivity = true;
      if (rqSummary.activityCounts.size()) {
        if (rqSummary.activityCounts.size() >= rqSummary.jobs)
          jobsWithoutActivity = false;
        // In all cases, we create one potential mount per activity
        for (auto ac: rqSummary.activityCounts) {
          tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
          auto & m = tmdi.potentialMounts.back();
          m.vid = rqp.vid;
          m.type = cta::common::dataStructures::MountType::Retrieve;
399
400
          m.bytesQueued = rqSummary.bytes;
          m.filesQueued = rqSummary.jobs;
Eric Cano's avatar
Eric Cano committed
401
          m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime;
402
403
404
          m.priority = priority;
          m.maxDrivesAllowed = maxDrivesAllowed;
          m.minRequestAge = minRetrieveRequestAge;
Eric Cano's avatar
Eric Cano committed
405
406
407
408
409
410
411
412
413
414
415
          m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller.
          m.tapePool = "";       // The tape pool is not know and will be determined by the caller.
          m.vendor = "";         // The vendor is not known here, and will be determined by the caller.
          m.mediaType = "";      // The logical library is not known here, and will be determined by the caller.
          m.vo = "";             // The vo is not known here, and will be determined by the caller.
          m.capacityInBytes = 0; // The capacity is not known here, and will be determined by the caller.
          m.activityNameAndWeightedMountCount = PotentialMount::ActivityNameAndWeightedMountCount();
          m.activityNameAndWeightedMountCount.value().activity = ac.activity;
          m.activityNameAndWeightedMountCount.value().weight = ac.weight;
          m.activityNameAndWeightedMountCount.value().weightedMountCount = 0.0; // This value will be computed later by the caller.
          m.activityNameAndWeightedMountCount.value().mountCount = 0; // This value will be computed later by the caller.
416
417
          // We will display the sleep flag only if it is not expired (15 minutes timeout, hardcoded).
          // This allows having a single decision point instead of implementing is at the consumer levels.
418
419
          if (rqSummary.sleepInfo && (::time(nullptr) < (rqSummary.sleepInfo.value().sleepStartTime 
              + (int64_t) rqSummary.sleepInfo.value().sleepTime)) ) {
420
421
422
            m.sleepingMount = true;
            m.sleepStartTime = rqSummary.sleepInfo.value().sleepStartTime;
            m.diskSystemSleptFor = rqSummary.sleepInfo.value().diskSystemSleptFor;
423
            m.sleepTime = rqSummary.sleepInfo.value().sleepTime;
424
          }
Eric Cano's avatar
Eric Cano committed
425
426
427
428
429
430
431
        }
      }
      if (jobsWithoutActivity) {
        tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
        auto & m = tmdi.potentialMounts.back();
        m.vid = rqp.vid;
        m.type = cta::common::dataStructures::MountType::Retrieve;
432
433
434
        m.bytesQueued = rqSummary.bytes;
        m.filesQueued = rqSummary.jobs;      
        m.oldestJobStartTime = rqSummary.oldestJobStartTime;
435
436
437
        m.priority = priority;
        m.maxDrivesAllowed = maxDrivesAllowed;
        m.minRequestAge = minRetrieveRequestAge;
Eric Cano's avatar
Eric Cano committed
438
439
440
441
442
443
        m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller.
        m.tapePool = "";       // The tape pool is not know and will be determined by the caller.
        m.vendor = "";         // The vendor is not known here, and will be determined by the caller.
        m.mediaType = "";      // The logical library is not known here, and will be determined by the caller.
        m.vo = "";             // The vo is not known here, and will be determined by the caller.
        m.capacityInBytes = 0; // The capacity is not known here, and will be determined by the caller.
444
445
        // We will display the sleep flag only if it is not expired (15 minutes timeout, hardcoded).
        // This allows having a single decision point instead of implementing is at the consumer levels.
446
447
        if (rqSummary.sleepInfo && (::time(nullptr) < (rqSummary.sleepInfo.value().sleepStartTime
            + (int64_t) rqSummary.sleepInfo.value().sleepTime)) ) {
448
449
450
          m.sleepingMount = true;
          m.sleepStartTime = rqSummary.sleepInfo.value().sleepStartTime;
          m.diskSystemSleptFor = rqSummary.sleepInfo.value().diskSystemSleptFor;
451
          rqSummary.sleepInfo.value().sleepTime;
452
        }
Eric Cano's avatar
Eric Cano committed
453
      }
454
    } else {
455
456
      if(!rqSummary.jobs)
        tmdi.queueTrimRequired = true;
457
    }
458
459
460
    auto processingTime = t.secs(utils::Timer::resetCounter);
    log::ScopedParamContainer params (logContext);
    params.add("queueObject", rqp.address)
461
          .add("tapeVid", rqp.vid)
462
463
          .add("queueLockTime", queueLockTime)
          .add("queueFetchTime", queueFetchTime)
464
          .add("processingTime", processingTime);
465
466
467
    if(queueLockTime > 1 || queueFetchTime > 1){
      logContext.log(log::WARNING, "In OStoreDB::fetchMountInfo(): fetched a retrieve queue and that lasted more than 1 second.");
    }
468
  }
469
  // Collect information about the existing and next mounts
470
  // If a next mount exists the drive "counts double", but the corresponding drive
471
  // is either about to mount, or about to replace its current mount.
472
  double registerFetchTime = 0;
473
  auto driveStates = Helpers::getAllDriveStates(m_objectStore, logContext);
474
  registerFetchTime = t.secs(utils::Timer::resetCounter);
475
  using common::dataStructures::DriveStatus;
476
  std::set<int> activeDriveStatuses = {
477
478
    (int)cta::common::dataStructures::DriveStatus::Starting,
    (int)cta::common::dataStructures::DriveStatus::Mounting,
Eric Cano's avatar
Eric Cano committed
479
    (int)cta::common::dataStructures::DriveStatus::Transferring,
480
481
    (int)cta::common::dataStructures::DriveStatus::Unloading,
    (int)cta::common::dataStructures::DriveStatus::Unmounting,
482
483
    (int)cta::common::dataStructures::DriveStatus::DrainingToDisk,
    (int)cta::common::dataStructures::DriveStatus::CleaningUp };
484
  std::set<int> activeMountTypes = {
485
    (int)cta::common::dataStructures::MountType::ArchiveForUser,
486
    (int)cta::common::dataStructures::MountType::ArchiveForRepack,
487
488
    (int)cta::common::dataStructures::MountType::Retrieve,
    (int)cta::common::dataStructures::MountType::Label };
489
490
  for (const auto &d : driveStates) {
    if (activeDriveStatuses.count((int)d.driveStatus)) {
491
      tmdi.existingOrNextMounts.push_back(ExistingMount());
492
493
494
495
      tmdi.existingOrNextMounts.back().type = d.mountType;
      tmdi.existingOrNextMounts.back().tapePool = d.currentTapePool;
      tmdi.existingOrNextMounts.back().driveName = d.driveName;
      tmdi.existingOrNextMounts.back().vid = d.currentVid;
496
      tmdi.existingOrNextMounts.back().currentMount = true;
497
498
499
      tmdi.existingOrNextMounts.back().bytesTransferred = d.bytesTransferredInSession;
      tmdi.existingOrNextMounts.back().filesTransferred = d.filesTransferredInSession;
      tmdi.existingOrNextMounts.back().latestBandwidth = d.latestBandwidth;
Eric Cano's avatar
Eric Cano committed
500
501
      if (d.currentActivityAndWeight)
        tmdi.existingOrNextMounts.back().activity = d.currentActivityAndWeight.value().activity;
502
    }
503
    if (activeMountTypes.count((int)d.nextMountType)) {
504
      tmdi.existingOrNextMounts.push_back(ExistingMount());
505
506
507
508
      tmdi.existingOrNextMounts.back().type = d.nextMountType;
      tmdi.existingOrNextMounts.back().tapePool = d.nextTapepool;
      tmdi.existingOrNextMounts.back().driveName = d.driveName;
      tmdi.existingOrNextMounts.back().vid = d.nextVid;
509
510
511
512
      tmdi.existingOrNextMounts.back().currentMount = false;
      tmdi.existingOrNextMounts.back().bytesTransferred = 0;
      tmdi.existingOrNextMounts.back().filesTransferred = 0;
      tmdi.existingOrNextMounts.back().latestBandwidth = 0;
Eric Cano's avatar
Eric Cano committed
513
514
      if (d.nextActivityAndWeight)
        tmdi.existingOrNextMounts.back().activity = d.currentActivityAndWeight.value().activity;
515
516
    }
  }
517
518
  auto registerProcessingTime = t.secs(utils::Timer::resetCounter);
  log::ScopedParamContainer params (logContext);
519
520
  params.add("queueFetchTime", registerFetchTime)
        .add("processingTime", registerProcessingTime);
521
522
  if ((registerFetchTime > 1) || (registerProcessingTime > 1))
    logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched the drive register.");
523
524
}

525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
//------------------------------------------------------------------------------
// OStoreDB::getMountPoliciesInQueue()
//------------------------------------------------------------------------------
std::list<common::dataStructures::MountPolicy> OStoreDB::getMountPoliciesInQueue(const std::list<common::dataStructures::MountPolicy> & mountPoliciesInCatalogue, const std::map<std::string, uint64_t>& queueMountPolicyMap) {
  std::list<cta::common::dataStructures::MountPolicy> mountPolicyRet;
  std::copy_if(mountPoliciesInCatalogue.begin(),mountPoliciesInCatalogue.end(),std::back_inserter(mountPolicyRet),[&queueMountPolicyMap](const cta::common::dataStructures::MountPolicy & mp){
    return queueMountPolicyMap.find(mp.name) != queueMountPolicyMap.end();
  });
  return mountPolicyRet;
}

//------------------------------------------------------------------------------
// OStoreDB::createBestArchiveMountPolicy()
//------------------------------------------------------------------------------
common::dataStructures::MountPolicy OStoreDB::createBestArchiveMountPolicy(const std::list<common::dataStructures::MountPolicy>& mountPolicies) {
  if(mountPolicies.empty()){
    throw cta::exception::Exception("In OStoreDB::createBestArchiveMountPolicy(), empty mount policy list.");
  }
  //We need to get the most advantageous mountPolicy
  //As the Init element of the reduce function is the first element of the list, we start the reduce with the second element (++mountPolicyInQueueList.begin())
  common::dataStructures::MountPolicy bestMountPolicy = cta::utils::reduce(++mountPolicies.begin(), mountPolicies.end(),mountPolicies.front(),[](const common::dataStructures::MountPolicy & mp1, const common::dataStructures::MountPolicy & mp2){
    common::dataStructures::MountPolicy mp = mp1;
    if(mp1.archivePriority > mp2.archivePriority){
      mp.archivePriority = mp1.archivePriority;
    }
    if(mp1.archiveMinRequestAge < mp2.archiveMinRequestAge){
      mp.archiveMinRequestAge = mp1.archiveMinRequestAge;
    }
    if(mp1.maxDrivesAllowed > mp2.maxDrivesAllowed){
      mp.maxDrivesAllowed = mp1.maxDrivesAllowed;
    }
    return mp;
  });
  return bestMountPolicy;
}

//------------------------------------------------------------------------------
// OStoreDB::createBestRetrieveMountPolicy()
//------------------------------------------------------------------------------
common::dataStructures::MountPolicy OStoreDB::createBestRetrieveMountPolicy(const std::list<common::dataStructures::MountPolicy>& mountPolicies) {
  if(mountPolicies.empty()){
    throw cta::exception::Exception("In OStoreDB::createBestRetrieveMountPolicy(), empty mount policy list.");
  }
  //We need to get the most advantageous mountPolicy
  //As the Init element of the reduce function is the first element of the list, we start the reduce with the second element (++mountPolicyInQueueList.begin())
  common::dataStructures::MountPolicy bestMountPolicy = cta::utils::reduce(++mountPolicies.begin(), mountPolicies.end(),mountPolicies.front(),[](const common::dataStructures::MountPolicy & mp1, const common::dataStructures::MountPolicy & mp2){
    common::dataStructures::MountPolicy mp = mp1;
    if(mp1.retrievePriority > mp2.retrievePriority){
      mp.retrievePriority = mp1.retrievePriority;
    }
    if(mp1.retrieveMinRequestAge < mp2.retrieveMinRequestAge){
      mp.retrieveMinRequestAge = mp1.retrieveMinRequestAge;
    }
    if(mp1.maxDrivesAllowed > mp2.maxDrivesAllowed){
      mp.maxDrivesAllowed = mp1.maxDrivesAllowed;
    }
    return mp;
  });
  return bestMountPolicy;
}



588
589
590
591
//------------------------------------------------------------------------------
// OStoreDB::getMountInfo()
//------------------------------------------------------------------------------
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> 
592
593
  OStoreDB::getMountInfo(log::LogContext& logContext) {
  utils::Timer t;
594
595
  //Allocate the getMountInfostructure to return.
  assertAgentAddressSet();
596
  std::unique_ptr<OStoreDB::TapeMountDecisionInfo> privateRet (new OStoreDB::TapeMountDecisionInfo(*this));
597
598
599
  TapeMountDecisionInfo & tmdi=*privateRet;
  // Get all the tape pools and tapes with queues (potential mounts)
  objectstore::RootEntry re(m_objectStore);
600
601
  re.fetchNoLock();
  auto rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
602
603
604
605
  // Take an exclusive lock on the scheduling and fetch it.
  tmdi.m_schedulerGlobalLock.reset(
    new SchedulerGlobalLock(re.getSchedulerGlobalLock(), m_objectStore));
  tmdi.m_lockOnSchedulerGlobalLock.lock(*tmdi.m_schedulerGlobalLock);
606
  auto lockSchedGlobalTime = t.secs(utils::Timer::resetCounter);
607
608
  tmdi.m_lockTaken = true;
  tmdi.m_schedulerGlobalLock->fetch();
609
  auto fetchSchedGlobalTime = t.secs(utils::Timer::resetCounter);;
610
  fetchMountInfo(tmdi, re, SchedulerDatabase::PurposeGetMountInfo::GET_NEXT_MOUNT, logContext);
611
  auto fetchMountInfoTime = t.secs(utils::Timer::resetCounter);
612
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
613
614
615
616
617
618
  {
    log::ScopedParamContainer params(logContext);
    params.add("rootFetchNoLockTime", rootFetchNoLockTime)
          .add("lockSchedGlobalTime", lockSchedGlobalTime)
          .add("fetchSchedGlobalTime", fetchSchedGlobalTime)
          .add("fetchMountInfoTime", fetchMountInfoTime);
619
620
    if ((rootFetchNoLockTime > 1) || (lockSchedGlobalTime > 1) || (fetchSchedGlobalTime > 1) || fetchMountInfoTime > 1)
      logContext.log(log::INFO, "In OStoreDB::getMountInfo(): success.");
621
  }
622
623
624
  return ret;
}

625
626
627
//------------------------------------------------------------------------------
// OStoreDB::getMountInfoNoLock()
//------------------------------------------------------------------------------
628
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> OStoreDB::getMountInfoNoLock(PurposeGetMountInfo purpose, log::LogContext & logContext) {
629
630
631
632
633
634
635
636
637
  utils::Timer t;
  //Allocate the getMountInfostructure to return.
  assertAgentAddressSet();
  std::unique_ptr<OStoreDB::TapeMountDecisionInfoNoLock> privateRet (new OStoreDB::TapeMountDecisionInfoNoLock);
  // Get all the tape pools and tapes with queues (potential mounts)
  objectstore::RootEntry re(m_objectStore);
  re.fetchNoLock();
  auto rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
  TapeMountDecisionInfoNoLock & tmdi=*privateRet;
638
  fetchMountInfo(tmdi, re, purpose, logContext);
639
640
641
642
643
644
  auto fetchMountInfoTime = t.secs(utils::Timer::resetCounter);
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
  {
    log::ScopedParamContainer params(logContext);
    params.add("rootFetchNoLockTime", rootFetchNoLockTime)
          .add("fetchMountInfoTime", fetchMountInfoTime);
645
646
    if ((rootFetchNoLockTime > 1) || (fetchMountInfoTime > 1))
      logContext.log(log::INFO, "In OStoreDB::getMountInfoNoLock(): success.");
647
648
649
  }
  return ret;
}
650
651
652
653
654
655
656
657
658
//------------------------------------------------------------------------------
// OStoreDB::trimEmptyQueues()
//------------------------------------------------------------------------------
void OStoreDB::trimEmptyQueues(log::LogContext& lc) {
  // We will trim empty queues from the root entry.
  lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): will start trimming empty queues");
  // Get an exclusive lock on the root entry, we have good chances to need it.
  RootEntry re(m_objectStore);
  ScopedExclusiveLock rel(re);
659
  re.fetch();
660
  for (auto & queueType: { JobQueueType::JobsToTransferForUser, JobQueueType::JobsToReportToUser, JobQueueType::FailedJobs} ) {
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
    try {
      auto archiveQueueList = re.dumpArchiveQueues(queueType);
      for (auto & a: archiveQueueList) {
        ArchiveQueue aq(a.address, m_objectStore);
        ScopedSharedLock aql(aq);
        aq.fetch();
        if (!aq.getJobsSummary().jobs) {
          aql.release();
          re.removeArchiveQueueAndCommit(a.tapePool, queueType, lc);
          log::ScopedParamContainer params(lc);
          params.add("tapePool", a.tapePool)
                .add("queueType", toString(queueType))
                .add("queueObject", a.address);
          lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty archive queue.");
        }
676
      }
677
678
      auto retrieveQueueList = re.dumpRetrieveQueues(queueType);
      for (auto &r : retrieveQueueList) {
679
680
681
        RetrieveQueue rq(r.address, m_objectStore);
        ScopedSharedLock rql(rq);
        rq.fetch();
682
        if (!rq.getJobsSummary().jobs) {
683
684
685
          rql.release();
          re.removeRetrieveQueueAndCommit(r.vid, queueType, lc);
          log::ScopedParamContainer params(lc);
686
          params.add("tapeVid", r.vid)
687
688
689
690
                .add("queueType", toString(queueType))
                .add("queueObject", r.address);
          lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty retrieve queue.");
        }
691
      }
692
693
694
695
696
    } catch (cta::exception::Exception & ex) {
      log::ScopedParamContainer params(lc);
      params.add("exceptionMessage", ex.getMessageValue());
      lc.log(log::ERR, "In OStoreDB::trimEmptyQueues(): got an exception. Stack trace follows.");
      lc.logBacktrace(log::ERR, ex.backtrace());
697
698
699
700
    }
  }
}

701
702
703
//------------------------------------------------------------------------------
// OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount()
//------------------------------------------------------------------------------
704
std::unique_ptr<SchedulerDatabase::ArchiveMount> OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount(
705
        common::dataStructures::MountType type,
706
707
708
709
710
711
712
713
714
        const catalogue::TapeForWriting& tape,
        const std::string driveName,
        const std::string& logicalLibrary,
        const std::string& hostName,
        const std::string& vo,
        const std::string& mediaType,
        const std::string& vendor,
        const uint64_t capacityInBytes,
        time_t startTime) {
715
716
717
718
719
720
  throw cta::exception::Exception("In OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount(): This function should not be called");
}

//------------------------------------------------------------------------------
// OStoreDB::TapeMountDecisionInfoNoLock::createRetrieveMount()
//------------------------------------------------------------------------------
721
722
723
724
725
726
727
728
729
std::unique_ptr<SchedulerDatabase::RetrieveMount> OStoreDB::TapeMountDecisionInfoNoLock::createRetrieveMount(const std::string& vid,
        const std::string& tapePool,
        const std::string driveName,
        const std::string& logicalLibrary,
        const std::string& hostName,
        const std::string& vo,
        const std::string& mediaType,
        const std::string& vendor,
        const uint64_t capacityInBytes,
Eric Cano's avatar
Eric Cano committed
730
        time_t startTime, const optional<common::dataStructures::DriveState::ActivityAndWeight> &) {
731
732
733
734
735
736
737
738
  throw cta::exception::Exception("In OStoreDB::TapeMountDecisionInfoNoLock::createRetrieveMount(): This function should not be called");
}

//------------------------------------------------------------------------------
// OStoreDB::TapeMountDecisionInfoNoLock::~TapeMountDecisionInfoNoLock()
//------------------------------------------------------------------------------
OStoreDB::TapeMountDecisionInfoNoLock::~TapeMountDecisionInfoNoLock() {}

739
740
741
//------------------------------------------------------------------------------
// OStoreDB::queueArchive()
//------------------------------------------------------------------------------
742
std::string OStoreDB::queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, 
743
        const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) {
744
  assertAgentAddressSet();
745
  cta::utils::Timer timer;
746
747
748
  auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
  cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
  auto * mutexForHelgrindAddr = mutexForHelgrind.release();
749
  // Construct the archive request object in memory
750
  auto aReq = cta::make_unique<cta::objectstore::ArchiveRequest> (m_agentReference->nextId("ArchiveRequest"), m_objectStore);
751
  aReq->initialize();
752
753
754
  // Summarize all as an archiveFile
  cta::common::dataStructures::ArchiveFile aFile;
  aFile.archiveFileID = criteria.fileId;
755
  aFile.checksumBlob = request.checksumBlob;
756
757
  // TODO: fully fledged archive file should not be the representation of the request.
  aFile.creationTime = std::numeric_limits<decltype(aFile.creationTime)>::min();
758
  aFile.reconciliationTime = 0;
759
760
  aFile.diskFileId = request.diskFileID;
  aFile.diskFileInfo = request.diskFileInfo;
761
  aFile.diskInstance = instanceName;
762
763
  aFile.fileSize = request.fileSize;
  aFile.storageClass = request.storageClass;
764
765
766
767
768
769
770
  aReq->setArchiveFile(aFile);
  aReq->setMountPolicy(criteria.mountPolicy);
  aReq->setArchiveReportURL(request.archiveReportURL);
  aReq->setArchiveErrorReportURL(request.archiveErrorReportURL);
  aReq->setRequester(request.requester);
  aReq->setSrcURL(request.srcURL);
  aReq->setEntryLog(request.creationLog);
771
  std::list<cta::objectstore::ArchiveRequest::JobDump> jl;
772
  for (auto & copy:criteria.copyToPoolMap) {
773
774
    const uint32_t hardcodedRetriesWithinMount = 2;
    const uint32_t hardcodedTotalRetries = 2;
775
776
777
    const uint32_t hardcodedReportRetries = 2;
    aReq->addJob(copy.first, copy.second, m_agentReference->getAgentAddress(),
        hardcodedRetriesWithinMount, hardcodedTotalRetries, hardcodedReportRetries);
778
    jl.push_back(cta::objectstore::ArchiveRequest::JobDump());
779
780
    jl.back().copyNb = copy.first;
    jl.back().tapePool = copy.second;
781
  }
782
  if (jl.empty()) {
783
784
785
    throw ArchiveRequestHasNoCopies("In OStoreDB::queue: the archive to file request has no copy");
  }
  // We create the object here
786
  m_agentReference->addToOwnership(aReq->getAddressIfSet(), m_objectStore);
Eric Cano's avatar
Eric Cano committed
787
  double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
788
  aReq->insert();
789
  std::string archiveRequestAddr = aReq->getAddressIfSet();
Eric Cano's avatar
Eric Cano committed
790
  double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
791
792
  // The request is now safe in the object store. We can now return to the caller and fire (and forget) a thread
  // complete the bottom half of it.
793
794
  m_taskQueueSize++;
  uint64_t taskQueueSize = m_taskQueueSize;
Eric Cano's avatar
Eric Cano committed
795
796
797
  // Prepare the logs to avoid multithread access on the object.
  log::ScopedParamContainer params(logContext);
  params.add("jobObject", aReq->getAddressIfSet())
798
799
800
801
        .add("fileId", aFile.archiveFileID)
        .add("diskInstance", aFile.diskInstance)
        .add("diskFilePath", aFile.diskFileInfo.path)
        .add("diskFileId", aFile.diskFileId)
Eric Cano's avatar
Eric Cano committed
802
803
        .add("agentReferencingTime", agentReferencingTime)
        .add("insertionTime", insertionTime);
804
  delayIfNecessary(logContext);
805
806
  auto * aReqPtr = aReq.release();
  auto * et = new EnqueueingTask([aReqPtr, mutexForHelgrindAddr, this]{
807
808
    std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr);
    cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
809
    std::unique_ptr<cta::objectstore::ArchiveRequest> aReq(aReqPtr);
810
811
    // This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
    auto scopedCounterDecrement = [this](void *){ 
812
      m_taskQueueSize--;
813
      m_taskPostingSemaphore.release();
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
    };
    // A bit ugly, but we need a non-null pointer for the "deleter" to be called.
    std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
    log::LogContext logContext(m_logger);
    utils::Timer timer;
    ScopedExclusiveLock arl(*aReq);
    double arRelockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    double arTotalQueueingTime = 0;
    double arTotalCommitTime = 0;
    double arTotalQueueUnlockTime = 0;
    // We can now enqueue the requests
    std::list<std::string> linkedTapePools;
    std::string currentTapepool;
    try {
      for (auto &j: aReq->dumpJobs()) {
        currentTapepool = j.tapePool;
        // The shared lock will be released automatically at the end of this scope.
        // The queueing implicitly sets the job owner as the queue (as should be). The queue should not
        // be unlocked before we commit the archive request (otherwise, the pointer could be seen as
        // stale and the job would be dereferenced from the queue.
        auto shareLock = ostoredb::MemArchiveQueue::sharedAddToQueue(j, j.tapePool, *aReq, *this, logContext);
        double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
        arTotalQueueingTime += qTime;
        aReq->commit();
        double cTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
        arTotalCommitTime += cTime;
        // Now we can let go off the queue.
        shareLock.reset();
        double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
        arTotalQueueUnlockTime += qUnlockTime;
        linkedTapePools.push_back(j.owner);
        log::ScopedParamContainer params(logContext);
846
        params.add("tapePool", j.tapePool)
847
848
849
850
851
              .add("queueObject", j.owner)
              .add("jobObject", aReq->getAddressIfSet())
              .add("queueingTime", qTime)
              .add("commitTime", cTime)
              .add("queueUnlockTime", qUnlockTime);
Eric Cano's avatar
Eric Cano committed
852
        logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): added job to queue.");
853
854
855
856
857
858
859
860
861
862
      }
    } catch (NoSuchArchiveQueue &ex) {
      // Unlink the request from already connected tape pools
      for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) {
        objectstore::ArchiveQueue aq(*tpa, m_objectStore);
        ScopedExclusiveLock aql(aq);
        aq.fetch();
        aq.removeJobsAndCommit({aReq->getAddressIfSet()});
      }
      aReq->remove();
863
      log::ScopedParamContainer params(logContext);
864
      params.add("tapePool", currentTapepool)
865
866
867
868
869
            .add("archiveRequestObject", aReq->getAddressIfSet())
            .add("exceptionMessage", ex.getMessageValue())
            .add("jobObject", aReq->getAddressIfSet());
      logContext.log(log::ERR, "In OStoreDB::queueArchive_bottomHalf(): failed to enqueue job");
      return;
870
    }
871
    // The request is now fully set.
872
873
874
875
876
877
    auto archiveFile = aReq->getArchiveFile();
    double arOwnerResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    arl.release();
    double arLockRelease = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    // And remove reference from the agent
    m_agentReference->removeFromOwnership(aReq->getAddressIfSet(), m_objectStore);
878
    double agOwnershipResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
879
    log::ScopedParamContainer params(logContext);
880
881
882
883
884
885
886
887
888
889
890
    params.add("jobObject", aReq->getAddressIfSet())
          .add("fileId", archiveFile.archiveFileID)
          .add("diskInstance", archiveFile.diskInstance)
          .add("diskFilePath", archiveFile.diskFileInfo.path)
          .add("diskFileId", archiveFile.diskFileId)
          .add("creationAndRelockTime", arRelockTime)
          .add("totalQueueingTime", arTotalQueueingTime)
          .add("totalCommitTime", arTotalCommitTime)
          .add("totalQueueUnlockTime", arTotalQueueUnlockTime)
          .add("ownerResetTime", arOwnerResetTime)
          .add("lockReleaseTime", arLockRelease)
891
892
893
894
          .add("agentOwnershipResetTime", agOwnershipResetTime)
          .add("totalTime", arRelockTime + arTotalQueueingTime + arTotalCommitTime
             + arTotalQueueUnlockTime + arOwnerResetTime + arLockRelease 
             + agOwnershipResetTime);
895
    logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): Finished enqueueing request.");
896
897
898
899
  });
  ANNOTATE_HAPPENS_BEFORE(et);
  mlForHelgrind.unlock();
  m_enqueueingTasksQueue.push(et);
Eric Cano's avatar
Eric Cano committed
900
901
902
903
904
  double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
  params.add("taskPostingTime", taskPostingTime)
        .add("taskQueueSize", taskQueueSize)
        .add("totalTime", agentReferencingTime + insertionTime + taskPostingTime);
  logContext.log(log::INFO, "In OStoreDB::queueArchive(): recorded request for queueing (enqueueing posted to thread pool).");  
905
  return archiveRequestAddr;
906
907
}

908
909
910
911
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
std::list<cta::common::dataStructures::ArchiveJob>
912
  OStoreDB::getArchiveJobs(const std::string &tapePoolName) const
913
{
914
  std::list<cta::common::dataStructures::ArchiveJob> ret;
915

916
  for(ArchiveQueueItor_t q_it(m_objectStore, JobQueueType::JobsToTransferForUser, tapePoolName); !q_it.end() ; ++q_it) {
917
918
    ret.push_back(*q_it);
  }
919

920
  return ret;
921
922
}

923
924
925
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
926
std::map<std::string, std::list<common::dataStructures::ArchiveJob>>
927
928
   OStoreDB::getArchiveJobs() const
{
929
  std::map<std::string, std::list<common::dataStructures::ArchiveJob>> ret;
930

931
  for(ArchiveQueueItor_t q_it(m_objectStore, JobQueueType::JobsToTransferForUser); !q_it.end(); ++q_it) {
932
933
    ret[q_it.qid()].push_back(*q_it);
  }
934

935
  return ret;
936
937
}

938
939
940
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobItor()
//------------------------------------------------------------------------------
941
OStoreDB::ArchiveQueueItor_t OStoreDB::getArchiveJobItor(const std::string &tapePoolName, JobQueueType queueType) const
942
{
943
  return ArchiveQueueItor_t(m_objectStore, queueType, tapePoolName);
944
}
945
946
947
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobItorPtr()
//------------------------------------------------------------------------------
948
OStoreDB::ArchiveQueueItor_t* OStoreDB::getArchiveJobItorPtr(const std::string &tapePoolName, JobQueueType queueType) const
949
{
950
  return new ArchiveQueueItor_t(m_objectStore, queueType, tapePoolName);
951
952
}

Eric Cano's avatar
Eric Cano committed
953
954
955
956
957
//------------------------------------------------------------------------------
// OStoreDB::getNextArchiveJobsToReportBatch()
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArchiveJobsToReportBatch(
    uint64_t filesRequested, log::LogContext& logContext) {
958
  typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToReportForUser> AQTRAlgo;
959
960
961
962
  AQTRAlgo aqtrAlgo(m_objectStore, *m_agentReference);
  // Decide from which queue we are going to pop.
  RootEntry re(m_objectStore);
  re.fetchNoLock();
963
  auto queueList = re.dumpArchiveQueues(JobQueueType::JobsToReportToUser);
964
965
966
967
968
969
  std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
  if (queueList.empty()) return ret;
  // Try to get jobs from the first non-empty queue.
  AQTRAlgo::PopCriteria criteria;
  criteria.files = filesRequested;
  AQTRAlgo::PoppedElementsBatch jobs;
970
  std::string tapePool;
971
  for (auto & q: queueList) {
972
      jobs = aqtrAlgo.popNextBatch(q.tapePool, criteria, logContext);
973
974
975
976
977
      if (!jobs.elements.empty()){
        //The tapepool of all jobs are the same within the queue
        tapePool = q.tapePool;
        break;
      }
978
  }
979
980
981
982
983
984
985
986
987
988
989
990
  for (auto & j: jobs.elements) {
    std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), *this));
    aj->tapeFile.copyNb = j.copyNb;
    aj->archiveFile = j.archiveFile;
    aj->srcURL = j.srcURL;
    aj->archiveReportURL = j.archiveReportURL;
    aj->errorReportURL = j.errorReportURL;
    aj->latestError = j.latestError;
    aj->reportType = j.reportType;
    // We leave the tape file not set. It does not exist in all cases (not in case of failure).
    aj->m_jobOwned = true;
    aj->m_mountId = 0;
991
    aj->m_tapePool = tapePool;
992
    ret.emplace_back(std::move(aj));
993
  }
994
  return ret;
Eric Cano's avatar
Eric Cano committed
995
996
997
}

//------------------------------------------------------------------------------
998
// OStoreDB::getArchiveJobsFailedSummary
Eric Cano's avatar
Eric Cano committed
999
//------------------------------------------------------------------------------
1000
SchedulerDatabase::JobsFailedSummary OStoreDB::getArchiveJobsFailedSummary(log::LogContext &logContext) {
For faster browsing, not all history is shown. View entire blame