OStoreDB.cpp 137 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "OStoreDB.hpp"
20
#include "MemQueues.hpp"
21
//#include "common/dataStructures/SecurityIdentity.hpp"
22
#include "objectstore/DriveRegister.hpp"
23
#include "objectstore/DriveState.hpp"
24
25
//#include "objectstore/ArchiveRequest.hpp"
//#include "objectstore/RetrieveRequest.hpp"
26
#include "objectstore/Helpers.hpp"
27
#include "common/exception/Exception.hpp"
28
#include "common/utils/utils.hpp"
29
#include "scheduler/LogicalLibrary.hpp"
30
#include "common/dataStructures/MountPolicy.hpp"
31
#include "common/make_unique.hpp"
32
#include "tapeserver/castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
33
#include <algorithm>
34
#include <cmath>
35
36
#include <stdlib.h>     /* srand, rand */
#include <time.h>       /* time */
37
#include <stdexcept>
38
#include <set>
39
#include <iostream>
40

41
namespace cta {  
42
43
using namespace objectstore;

44
45
46
//------------------------------------------------------------------------------
// OStoreDB::OStoreDB()
//------------------------------------------------------------------------------
47
OStoreDB::OStoreDB(objectstore::Backend& be, catalogue::Catalogue & catalogue, log::Logger &logger):
48
49
  m_taskQueueSize(0), m_taskPostingSemaphore(5), m_objectStore(be), m_catalogue(catalogue), m_logger(logger) {
  for (size_t i=0; i<5; i++) {
50
51
52
53
    m_enqueueingWorkerThreads.emplace_back(new EnqueueingWorkerThread(m_enqueueingTasksQueue));
    m_enqueueingWorkerThreads.back()->start();
  }
}
54

55
56
57
//------------------------------------------------------------------------------
// OStoreDB::~OStoreDB()
//------------------------------------------------------------------------------
58
OStoreDB::~OStoreDB() throw() {
Eric Cano's avatar
Eric Cano committed
59
  while (m_taskQueueSize) sleep(1);
60
61
62
63
64
65
  for (__attribute__((unused)) auto &t: m_enqueueingWorkerThreads) m_enqueueingTasksQueue.push(nullptr);
  for (auto &t: m_enqueueingWorkerThreads) {
    t->wait();
    delete t;
    t=nullptr;
  }
66
}
67

68
69
70
//------------------------------------------------------------------------------
// OStoreDB::setAgentReference()
//------------------------------------------------------------------------------
71
72
73
void OStoreDB::setAgentReference(objectstore::AgentReference *agentReference) {
  m_agentReference = agentReference;
}
74

75
76
77
//------------------------------------------------------------------------------
// OStoreDB::assertAgentAddressSet()
//------------------------------------------------------------------------------
78
79
80
void OStoreDB::assertAgentAddressSet() {
  if (!m_agentReference)
    throw AgentNotSet("In OStoreDB::assertAgentSet: Agent address not set");
81
}
82

83
84
85
86
//------------------------------------------------------------------------------
// OStoreDB::waitSubthreadsComplete()
//------------------------------------------------------------------------------
void OStoreDB::waitSubthreadsComplete() {
Eric Cano's avatar
Eric Cano committed
87
  while (m_taskQueueSize) std::this_thread::yield();
88
89
}

90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
//------------------------------------------------------------------------------
// OStoreDB::waitSubthreadsComplete()
//------------------------------------------------------------------------------
void OStoreDB::setThreadNumber(uint64_t threadNumber) {
  // Clear all threads.
  for (__attribute__((unused)) auto &t: m_enqueueingWorkerThreads) m_enqueueingTasksQueue.push(nullptr);
  for (auto &t: m_enqueueingWorkerThreads) {
    t->wait();
    delete t;
    t=nullptr;
  }
  m_enqueueingWorkerThreads.clear();
  // Create the new ones.
  for (size_t i=0; i<threadNumber; i++) {
    m_enqueueingWorkerThreads.emplace_back(new EnqueueingWorkerThread(m_enqueueingTasksQueue));
    m_enqueueingWorkerThreads.back()->start();
  }
}

109
110
111
112
//------------------------------------------------------------------------------
// OStoreDB::setBottomHalfQueueSize()
//------------------------------------------------------------------------------
void OStoreDB::setBottomHalfQueueSize(uint64_t tasksNumber) {
113
114
115
116
117
  // 5 is the default queue size.
  m_taskPostingSemaphore.release(tasksNumber - 5);
}


118
119
120
121
122
123
124
//------------------------------------------------------------------------------
// OStoreDB::EnqueueingWorkerThread::run()
//------------------------------------------------------------------------------
void OStoreDB::EnqueueingWorkerThread::run() {
  while (true) {
    std::unique_ptr<EnqueueingTask> et(m_enqueueingTasksQueue.pop());
    if (!et.get()) break;
125
    ANNOTATE_HAPPENS_AFTER(et.get());
126
    (*et)();
127
    ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(et.get());
128
129
130
  }
}

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//------------------------------------------------------------------------------
// OStoreDB::delayIfNecessary()
//------------------------------------------------------------------------------
void OStoreDB::delayIfNecessary(log::LogContext& lc) {
  bool delayInserted = false;
  utils::Timer t;
  uint64_t taskQueueSize = m_taskQueueSize;
  double sleepDelay = 0;
  double lockDelay = 0;
  if (m_taskQueueSize > 10000) {
    // Put a linear delay starting at 0 at 10000 element and going to 10s at 25000
    double delay = 10.0 * (taskQueueSize - 10000) / 15000;
    double delayInt;
    ::timespec ts;
    ts.tv_nsec = std::modf(delay, &delayInt) * 1000 * 1000 *1000;
146
    ts.tv_sec = delayInt;
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
    nanosleep(&ts, nullptr);
    sleepDelay = t.secs(utils::Timer::resetCounter);
    delayInserted = true;
  }
  if (!m_taskPostingSemaphore.tryAcquire()) {
    m_taskPostingSemaphore.acquire();
    lockDelay = t.secs(utils::Timer::resetCounter);
    delayInserted = true;
  }
  if (delayInserted) {
    log::ScopedParamContainer params(lc);
    params.add("sleepDelay", sleepDelay)
          .add("lockDelay", lockDelay)
          .add("taskQueueSize", taskQueueSize);
    lc.log(log::INFO, "In OStoreDB::delayIfNecessary(): inserted delay.");
  }
}
164

165
166
167
//------------------------------------------------------------------------------
// OStoreDB::ping()
//------------------------------------------------------------------------------
168
void OStoreDB::ping() {
169
  // Validate we can lock and fetch the root entry.
170
  objectstore::RootEntry re(m_objectStore);
171
  re.fetchNoLock();
172
173
}

174
//------------------------------------------------------------------------------
175
// OStoreDB::fetchMountInfo()
176
//------------------------------------------------------------------------------
177
void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, RootEntry& re, 
178
    log::LogContext & logContext) {
179
  utils::Timer t, t2;
180
  // Walk the archive queues for statistics
Eric Cano's avatar
Eric Cano committed
181
  for (auto & aqp: re.dumpArchiveQueues(QueueType::LiveJobs)) {
182
    objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore);
183
    // debug utility variable
184
    std::string __attribute__((__unused__)) poolName = aqp.tapePool;
185
    objectstore::ScopedSharedLock aqlock;
186
187
    double queueLockTime = 0;
    double queueFetchTime = 0;
188
    try {
189
      aqueue.fetchNoLock();
190
      queueFetchTime = t.secs(utils::Timer::resetCounter);
191
    } catch (cta::exception::Exception &ex) {
192
      log::ScopedParamContainer params (logContext);
193
194
      params.add("queueObject", aqp.address)
            .add("tapepool", aqp.tapePool)
195
            .add("exceptionMessage", ex.getMessageValue());
196
      logContext.log(log::WARNING, "In OStoreDB::fetchMountInfo(): failed to lock/fetch an archive queue. Skipping it.");
197
      continue;
198
    }
199
200
    // If there are files queued, we create an entry for this tape pool in the
    // mount candidates list.
201
    if (aqueue.getJobsSummary().jobs) {
202
      tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
203
      auto & m = tmdi.potentialMounts.back();
204
      m.tapePool = aqp.tapePool;
205
      m.type = cta::common::dataStructures::MountType::Archive;
206
      m.bytesQueued = aqueue.getJobsSummary().bytes;
207
      m.filesQueued = aqueue.getJobsSummary().jobs;      
208
209
      m.oldestJobStartTime = aqueue.getJobsSummary().oldestJobStartTime;
      m.priority = aqueue.getJobsSummary().priority;
210
      m.maxDrivesAllowed = aqueue.getJobsSummary().maxDrivesAllowed;
211
      m.minRequestAge = aqueue.getJobsSummary().minArchiveRequestAge;
212
      m.logicalLibrary = "";
213
214
    } else {
      tmdi.queueTrimRequired = true;
215
    }
216
217
218
219
220
221
    auto processingTime = t.secs(utils::Timer::resetCounter);
    log::ScopedParamContainer params (logContext);
    params.add("queueObject", aqp.address)
          .add("tapepool", aqp.tapePool)
          .add("queueLockTime", queueLockTime)
          .add("queueFetchTime", queueFetchTime)
222
          .add("processingTime", processingTime);
223
    logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched an archive queue.");
224
  }
225
  // Walk the retrieve queues for statistics
Eric Cano's avatar
Eric Cano committed
226
  for (auto & rqp: re.dumpRetrieveQueues(QueueType::LiveJobs)) {
227
228
229
    RetrieveQueue rqueue(rqp.address, m_objectStore);
    // debug utility variable
    std::string __attribute__((__unused__)) vid = rqp.vid;
230
    ScopedSharedLock rqlock;
231
232
    double queueLockTime = 0;
    double queueFetchTime = 0;
233
    try {
234
      rqueue.fetchNoLock();
235
      queueFetchTime = t.secs(utils::Timer::resetCounter);
236
237
238
239
    } catch (cta::exception::Exception &ex) {
      log::LogContext lc(m_logger);
      log::ScopedParamContainer params (lc);
      params.add("queueObject", rqp.address)
240
            .add("tapeVid", rqp.vid)
241
            .add("exceptionMessage", ex.getMessageValue());
242
243
      lc.log(log::WARNING, "In OStoreDB::fetchMountInfo(): failed to lock/fetch a retrieve queue. Skipping it.");
      continue;
244
    }
245
246
247
248
249
250
    // If there are files queued, we create an entry for this retrieve queue in the
    // mount candidates list.
    if (rqueue.getJobsSummary().files) {
      tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
      auto & m = tmdi.potentialMounts.back();
      m.vid = rqp.vid;
251
      m.type = cta::common::dataStructures::MountType::Retrieve;
252
253
254
255
256
      m.bytesQueued = rqueue.getJobsSummary().bytes;
      m.filesQueued = rqueue.getJobsSummary().files;      
      m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime;
      m.priority = rqueue.getJobsSummary().priority;
      m.maxDrivesAllowed = rqueue.getJobsSummary().maxDrivesAllowed;
257
      m.minRequestAge = rqueue.getJobsSummary().minRetrieveRequestAge;
258
      m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller.
259
260
    } else {
      tmdi.queueTrimRequired = true;
261
    }
262
263
264
    auto processingTime = t.secs(utils::Timer::resetCounter);
    log::ScopedParamContainer params (logContext);
    params.add("queueObject", rqp.address)
265
          .add("tapeVid", rqp.vid)
266
267
          .add("queueLockTime", queueLockTime)
          .add("queueFetchTime", queueFetchTime)
268
          .add("processingTime", processingTime);
269
    logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched a retrieve queue.");
270
  }
271
  // Collect information about the existing and next mounts
272
  // If a next mount exists the drive "counts double", but the corresponding drive
273
  // is either about to mount, or about to replace its current mount.
274
  double registerFetchTime = 0;
275
  auto driveStates = Helpers::getAllDriveStates(m_objectStore, logContext);
276
  registerFetchTime = t.secs(utils::Timer::resetCounter);
277
  using common::dataStructures::DriveStatus;
278
  std::set<int> activeDriveStatuses = {
279
280
    (int)cta::common::dataStructures::DriveStatus::Starting,
    (int)cta::common::dataStructures::DriveStatus::Mounting,
Eric Cano's avatar
Eric Cano committed
281
    (int)cta::common::dataStructures::DriveStatus::Transferring,
282
283
    (int)cta::common::dataStructures::DriveStatus::Unloading,
    (int)cta::common::dataStructures::DriveStatus::Unmounting,
284
285
    (int)cta::common::dataStructures::DriveStatus::DrainingToDisk,
    (int)cta::common::dataStructures::DriveStatus::CleaningUp };
286
287
288
289
  std::set<int> activeMountTypes = {
    (int)cta::common::dataStructures::MountType::Archive,
    (int)cta::common::dataStructures::MountType::Retrieve,
    (int)cta::common::dataStructures::MountType::Label };
290
291
  for (const auto &d : driveStates) {
    if (activeDriveStatuses.count((int)d.driveStatus)) {
292
      tmdi.existingOrNextMounts.push_back(ExistingMount());
293
294
295
296
      tmdi.existingOrNextMounts.back().type = d.mountType;
      tmdi.existingOrNextMounts.back().tapePool = d.currentTapePool;
      tmdi.existingOrNextMounts.back().driveName = d.driveName;
      tmdi.existingOrNextMounts.back().vid = d.currentVid;
297
      tmdi.existingOrNextMounts.back().currentMount = true;
298
299
300
      tmdi.existingOrNextMounts.back().bytesTransferred = d.bytesTransferredInSession;
      tmdi.existingOrNextMounts.back().filesTransferred = d.filesTransferredInSession;
      tmdi.existingOrNextMounts.back().latestBandwidth = d.latestBandwidth;
301
    }
302
    if (activeMountTypes.count((int)d.nextMountType)) {
303
      tmdi.existingOrNextMounts.push_back(ExistingMount());
304
305
306
307
      tmdi.existingOrNextMounts.back().type = d.nextMountType;
      tmdi.existingOrNextMounts.back().tapePool = d.nextTapepool;
      tmdi.existingOrNextMounts.back().driveName = d.driveName;
      tmdi.existingOrNextMounts.back().vid = d.nextVid;
308
309
310
311
      tmdi.existingOrNextMounts.back().currentMount = false;
      tmdi.existingOrNextMounts.back().bytesTransferred = 0;
      tmdi.existingOrNextMounts.back().filesTransferred = 0;
      tmdi.existingOrNextMounts.back().latestBandwidth = 0;
312
313
    }
  }
314
315
  auto registerProcessingTime = t.secs(utils::Timer::resetCounter);
  log::ScopedParamContainer params (logContext);
316
317
  params.add("queueFetchTime", registerFetchTime)
        .add("processingTime", registerProcessingTime);
318
  logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched the drive register.");
319
320
321
322
323
324
}

//------------------------------------------------------------------------------
// OStoreDB::getMountInfo()
//------------------------------------------------------------------------------
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> 
325
326
  OStoreDB::getMountInfo(log::LogContext& logContext) {
  utils::Timer t;
327
328
  //Allocate the getMountInfostructure to return.
  assertAgentAddressSet();
329
  std::unique_ptr<OStoreDB::TapeMountDecisionInfo> privateRet (new OStoreDB::TapeMountDecisionInfo(*this));
330
331
332
  TapeMountDecisionInfo & tmdi=*privateRet;
  // Get all the tape pools and tapes with queues (potential mounts)
  objectstore::RootEntry re(m_objectStore);
333
334
  re.fetchNoLock();
  auto rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
335
336
337
338
  // Take an exclusive lock on the scheduling and fetch it.
  tmdi.m_schedulerGlobalLock.reset(
    new SchedulerGlobalLock(re.getSchedulerGlobalLock(), m_objectStore));
  tmdi.m_lockOnSchedulerGlobalLock.lock(*tmdi.m_schedulerGlobalLock);
339
  auto lockSchedGlobalTime = t.secs(utils::Timer::resetCounter);
340
341
  tmdi.m_lockTaken = true;
  tmdi.m_schedulerGlobalLock->fetch();
342
  auto fetchSchedGlobalTime = t.secs(utils::Timer::resetCounter);;
343
  fetchMountInfo(tmdi, re, logContext);
344
  auto fetchMountInfoTime = t.secs(utils::Timer::resetCounter);
345
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
346
347
348
349
350
351
352
353
  {
    log::ScopedParamContainer params(logContext);
    params.add("rootFetchNoLockTime", rootFetchNoLockTime)
          .add("lockSchedGlobalTime", lockSchedGlobalTime)
          .add("fetchSchedGlobalTime", fetchSchedGlobalTime)
          .add("fetchMountInfoTime", fetchMountInfoTime);
    logContext.log(log::INFO, "In OStoreDB::getMountInfo(): success.");
  }
354
355
356
  return ret;
}

357
358
359
360
361
362
363
364
365
366
367
368
369
//------------------------------------------------------------------------------
// OStoreDB::getMountInfoNoLock()
//------------------------------------------------------------------------------
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> OStoreDB::getMountInfoNoLock(log::LogContext & logContext) {
  utils::Timer t;
  //Allocate the getMountInfostructure to return.
  assertAgentAddressSet();
  std::unique_ptr<OStoreDB::TapeMountDecisionInfoNoLock> privateRet (new OStoreDB::TapeMountDecisionInfoNoLock);
  // Get all the tape pools and tapes with queues (potential mounts)
  objectstore::RootEntry re(m_objectStore);
  re.fetchNoLock();
  auto rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
  TapeMountDecisionInfoNoLock & tmdi=*privateRet;
370
  fetchMountInfo(tmdi, re, logContext);
371
372
373
374
375
376
377
378
379
380
  auto fetchMountInfoTime = t.secs(utils::Timer::resetCounter);
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
  {
    log::ScopedParamContainer params(logContext);
    params.add("rootFetchNoLockTime", rootFetchNoLockTime)
          .add("fetchMountInfoTime", fetchMountInfoTime);
    logContext.log(log::INFO, "In OStoreDB::getMountInfoNoLock(): success.");
  }
  return ret;
}
381
382
383
384
385
386
387
388
389
//------------------------------------------------------------------------------
// OStoreDB::trimEmptyQueues()
//------------------------------------------------------------------------------
void OStoreDB::trimEmptyQueues(log::LogContext& lc) {
  // We will trim empty queues from the root entry.
  lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): will start trimming empty queues");
  // Get an exclusive lock on the root entry, we have good chances to need it.
  RootEntry re(m_objectStore);
  ScopedExclusiveLock rel(re);
390
  re.fetch();
391
  try {
Eric Cano's avatar
Eric Cano committed
392
    auto archiveQueueList = re.dumpArchiveQueues(QueueType::LiveJobs);
393
394
395
396
    for (auto & a: archiveQueueList) {
      ArchiveQueue aq(a.address, m_objectStore);
      ScopedSharedLock aql(aq);
      aq.fetch();
397
      if (!aq.getJobsSummary().jobs) {
398
        aql.release();
Eric Cano's avatar
Eric Cano committed
399
        re.removeArchiveQueueAndCommit(a.tapePool, QueueType::LiveJobs, lc);
400
401
402
403
404
405
        log::ScopedParamContainer params(lc);
        params.add("tapePool", a.tapePool)
              .add("queueObject", a.address);
        lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty archive queue.");
      }
    }
Eric Cano's avatar
Eric Cano committed
406
    auto retrieveQeueueList = re.dumpRetrieveQueues(QueueType::LiveJobs);
407
408
409
410
    for (auto & r:retrieveQeueueList) {
      RetrieveQueue rq(r.address, m_objectStore);
      ScopedSharedLock rql(rq);
      rq.fetch();
411
      if (!rq.getJobsSummary().files) {
412
        rql.release();
Eric Cano's avatar
Eric Cano committed
413
        re.removeRetrieveQueueAndCommit(r.vid, QueueType::LiveJobs, lc);
414
        log::ScopedParamContainer params(lc);
415
        params.add("tapeVid", r.vid)
416
417
418
419
420
421
422
423
424
425
426
427
              .add("queueObject", r.address);
        lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty retrieve queue.");
      }
    }
  } catch (cta::exception::Exception & ex) {
    log::ScopedParamContainer params(lc);
    params.add("exceptionMessage", ex.getMessageValue());
    lc.log(log::ERR, "In OStoreDB::trimEmptyQueues(): got an exception. Stack trace follows.");
    lc.logBacktrace(log::ERR, ex.backtrace());
  }
}

428
429
430
//------------------------------------------------------------------------------
// OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount()
//------------------------------------------------------------------------------
431
432
433
434
435
436
437
438
439
440
std::unique_ptr<SchedulerDatabase::ArchiveMount> OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount(
        const catalogue::TapeForWriting& tape,
        const std::string driveName,
        const std::string& logicalLibrary,
        const std::string& hostName,
        const std::string& vo,
        const std::string& mediaType,
        const std::string& vendor,
        const uint64_t capacityInBytes,
        time_t startTime) {
441
442
443
444
445
446
  throw cta::exception::Exception("In OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount(): This function should not be called");
}

//------------------------------------------------------------------------------
// OStoreDB::TapeMountDecisionInfoNoLock::createRetrieveMount()
//------------------------------------------------------------------------------
447
448
449
450
451
452
453
454
455
456
std::unique_ptr<SchedulerDatabase::RetrieveMount> OStoreDB::TapeMountDecisionInfoNoLock::createRetrieveMount(const std::string& vid,
        const std::string& tapePool,
        const std::string driveName,
        const std::string& logicalLibrary,
        const std::string& hostName,
        const std::string& vo,
        const std::string& mediaType,
        const std::string& vendor,
        const uint64_t capacityInBytes,
        time_t startTime) {
457
458
459
460
461
462
463
464
  throw cta::exception::Exception("In OStoreDB::TapeMountDecisionInfoNoLock::createRetrieveMount(): This function should not be called");
}

//------------------------------------------------------------------------------
// OStoreDB::TapeMountDecisionInfoNoLock::~TapeMountDecisionInfoNoLock()
//------------------------------------------------------------------------------
OStoreDB::TapeMountDecisionInfoNoLock::~TapeMountDecisionInfoNoLock() {}

465
466
467
//------------------------------------------------------------------------------
// OStoreDB::queueArchive()
//------------------------------------------------------------------------------
468
void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, 
469
        const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) {
470
  assertAgentAddressSet();
471
  cta::utils::Timer timer;
472
473
474
  auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
  cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
  auto * mutexForHelgrindAddr = mutexForHelgrind.release();
475
  // Construct the archive request object in memory
476
  auto aReq = cta::make_unique<cta::objectstore::ArchiveRequest> (m_agentReference->nextId("ArchiveRequest"), m_objectStore);
477
  aReq->initialize();
478
479
480
481
482
483
484
  // Summarize all as an archiveFile
  cta::common::dataStructures::ArchiveFile aFile;
  aFile.archiveFileID = criteria.fileId;
  aFile.checksumType = request.checksumType;
  aFile.checksumValue = request.checksumValue;
  // TODO: fully fledged archive file should not be the representation of the request.
  aFile.creationTime = std::numeric_limits<decltype(aFile.creationTime)>::min();
485
  aFile.reconciliationTime = 0;
486
487
  aFile.diskFileId = request.diskFileID;
  aFile.diskFileInfo = request.diskFileInfo;
488
  aFile.diskInstance = instanceName;
489
490
  aFile.fileSize = request.fileSize;
  aFile.storageClass = request.storageClass;
491
492
493
494
495
496
497
  aReq->setArchiveFile(aFile);
  aReq->setMountPolicy(criteria.mountPolicy);
  aReq->setArchiveReportURL(request.archiveReportURL);
  aReq->setArchiveErrorReportURL(request.archiveErrorReportURL);
  aReq->setRequester(request.requester);
  aReq->setSrcURL(request.srcURL);
  aReq->setEntryLog(request.creationLog);
498
  std::list<cta::objectstore::ArchiveRequest::JobDump> jl;
499
  for (auto & copy:criteria.copyToPoolMap) {
500
501
    const uint32_t hardcodedRetriesWithinMount = 2;
    const uint32_t hardcodedTotalRetries = 2;
502
    aReq->addJob(copy.first, copy.second, m_agentReference->getAgentAddress(), hardcodedRetriesWithinMount, hardcodedTotalRetries);
503
    jl.push_back(cta::objectstore::ArchiveRequest::JobDump());
504
505
    jl.back().copyNb = copy.first;
    jl.back().tapePool = copy.second;
506
  }
507
  if (jl.empty()) {
508
509
510
    throw ArchiveRequestHasNoCopies("In OStoreDB::queue: the archive to file request has no copy");
  }
  // We create the object here
511
  m_agentReference->addToOwnership(aReq->getAddressIfSet(), m_objectStore);
Eric Cano's avatar
Eric Cano committed
512
  double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
513
514
  aReq->setOwner(m_agentReference->getAgentAddress());
  aReq->insert();
Eric Cano's avatar
Eric Cano committed
515
  double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
516
517
  // The request is now safe in the object store. We can now return to the caller and fire (and forget) a thread
  // complete the bottom half of it.
518
519
  m_taskQueueSize++;
  uint64_t taskQueueSize = m_taskQueueSize;
Eric Cano's avatar
Eric Cano committed
520
521
522
  // Prepare the logs to avoid multithread access on the object.
  log::ScopedParamContainer params(logContext);
  params.add("jobObject", aReq->getAddressIfSet())
523
524
525
526
        .add("fileId", aFile.archiveFileID)
        .add("diskInstance", aFile.diskInstance)
        .add("diskFilePath", aFile.diskFileInfo.path)
        .add("diskFileId", aFile.diskFileId)
Eric Cano's avatar
Eric Cano committed
527
528
        .add("agentReferencingTime", agentReferencingTime)
        .add("insertionTime", insertionTime);
529
  delayIfNecessary(logContext);
530
531
  auto * aReqPtr = aReq.release();
  auto * et = new EnqueueingTask([aReqPtr, mutexForHelgrindAddr, this]{
532
533
    std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr);
    cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
534
    std::unique_ptr<cta::objectstore::ArchiveRequest> aReq(aReqPtr);
535
536
    // This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
    auto scopedCounterDecrement = [this](void *){ 
537
      m_taskQueueSize--;
538
      m_taskPostingSemaphore.release();
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
    };
    // A bit ugly, but we need a non-null pointer for the "deleter" to be called.
    std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
    log::LogContext logContext(m_logger);
    utils::Timer timer;
    ScopedExclusiveLock arl(*aReq);
    double arRelockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    double arTotalQueueingTime = 0;
    double arTotalCommitTime = 0;
    double arTotalQueueUnlockTime = 0;
    // We can now enqueue the requests
    std::list<std::string> linkedTapePools;
    std::string currentTapepool;
    try {
      for (auto &j: aReq->dumpJobs()) {
        currentTapepool = j.tapePool;
        // The shared lock will be released automatically at the end of this scope.
        // The queueing implicitly sets the job owner as the queue (as should be). The queue should not
        // be unlocked before we commit the archive request (otherwise, the pointer could be seen as
        // stale and the job would be dereferenced from the queue.
        auto shareLock = ostoredb::MemArchiveQueue::sharedAddToQueue(j, j.tapePool, *aReq, *this, logContext);
        double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
        arTotalQueueingTime += qTime;
        aReq->commit();
        double cTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
        arTotalCommitTime += cTime;
        // Now we can let go off the queue.
        shareLock.reset();
        double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
        arTotalQueueUnlockTime += qUnlockTime;
        linkedTapePools.push_back(j.owner);
        log::ScopedParamContainer params(logContext);
        params.add("tapepool", j.tapePool)
              .add("queueObject", j.owner)
              .add("jobObject", aReq->getAddressIfSet())
              .add("queueingTime", qTime)
              .add("commitTime", cTime)
              .add("queueUnlockTime", qUnlockTime);
Eric Cano's avatar
Eric Cano committed
577
        logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): added job to queue.");
578
579
580
581
582
583
584
585
586
587
      }
    } catch (NoSuchArchiveQueue &ex) {
      // Unlink the request from already connected tape pools
      for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) {
        objectstore::ArchiveQueue aq(*tpa, m_objectStore);
        ScopedExclusiveLock aql(aq);
        aq.fetch();
        aq.removeJobsAndCommit({aReq->getAddressIfSet()});
      }
      aReq->remove();
588
      log::ScopedParamContainer params(logContext);
589
590
591
592
593
594
      params.add("tapepool", currentTapepool)
            .add("archiveRequestObject", aReq->getAddressIfSet())
            .add("exceptionMessage", ex.getMessageValue())
            .add("jobObject", aReq->getAddressIfSet());
      logContext.log(log::ERR, "In OStoreDB::queueArchive_bottomHalf(): failed to enqueue job");
      return;
595
    }
596
597
598
599
600
601
602
603
604
605
    // The request is now fully set. As it's multi-owned, we do not set the owner,
    // just to disown it from the agent.
    aReq->setOwner("");
    auto archiveFile = aReq->getArchiveFile();
    aReq->commit();
    double arOwnerResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    arl.release();
    double arLockRelease = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    // And remove reference from the agent
    m_agentReference->removeFromOwnership(aReq->getAddressIfSet(), m_objectStore);
606
    double agOwnershipResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
607
    log::ScopedParamContainer params(logContext);
608
609
610
611
612
613
614
615
616
617
618
    params.add("jobObject", aReq->getAddressIfSet())
          .add("fileId", archiveFile.archiveFileID)
          .add("diskInstance", archiveFile.diskInstance)
          .add("diskFilePath", archiveFile.diskFileInfo.path)
          .add("diskFileId", archiveFile.diskFileId)
          .add("creationAndRelockTime", arRelockTime)
          .add("totalQueueingTime", arTotalQueueingTime)
          .add("totalCommitTime", arTotalCommitTime)
          .add("totalQueueUnlockTime", arTotalQueueUnlockTime)
          .add("ownerResetTime", arOwnerResetTime)
          .add("lockReleaseTime", arLockRelease)
619
620
621
622
          .add("agentOwnershipResetTime", agOwnershipResetTime)
          .add("totalTime", arRelockTime + arTotalQueueingTime + arTotalCommitTime
             + arTotalQueueUnlockTime + arOwnerResetTime + arLockRelease 
             + agOwnershipResetTime);
623
    logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): Finished enqueueing request.");
624
625
626
627
  });
  ANNOTATE_HAPPENS_BEFORE(et);
  mlForHelgrind.unlock();
  m_enqueueingTasksQueue.push(et);
Eric Cano's avatar
Eric Cano committed
628
629
630
631
632
  double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
  params.add("taskPostingTime", taskPostingTime)
        .add("taskQueueSize", taskQueueSize)
        .add("totalTime", agentReferencingTime + insertionTime + taskPostingTime);
  logContext.log(log::INFO, "In OStoreDB::queueArchive(): recorded request for queueing (enqueueing posted to thread pool).");  
633
634
}

635
636
637
//------------------------------------------------------------------------------
// OStoreDB::ArchiveToFileRequestCancelation::complete()
//------------------------------------------------------------------------------
638
void OStoreDB::ArchiveToFileRequestCancelation::complete(log::LogContext & lc) {
639
640
641
642
  if (m_closed)
    throw ArchiveRequestAlreadyDeleted("OStoreDB::ArchiveToFileRequestCancelation::complete(): called twice");
  // We just need to delete the object and forget it
  m_request.remove();
643
644
645
  log::ScopedParamContainer params(lc);
  params.add("archiveRequestObject", m_request.getAddressIfSet());
  lc.log(log::INFO, "In ArchiveToFileRequestCancelation::complete(): removed archive request.");
646
  m_agentReference.removeFromOwnership(m_request.getAddressIfSet(), m_objectStore);
647
  m_closed = true;
648
}
649

650
651
652
//------------------------------------------------------------------------------
// OStoreDB::ArchiveToFileRequestCancelation::~ArchiveToFileRequestCancelation()
//------------------------------------------------------------------------------
653
654
OStoreDB::ArchiveToFileRequestCancelation::~ArchiveToFileRequestCancelation() {
  if (!m_closed) {
655
    try {
656
657
      log::LogContext lc(m_logger);
      m_request.garbageCollect(m_agentReference.getAgentAddress(), m_agentReference, lc, m_catalogue);
658
      m_agentReference.removeFromOwnership(m_request.getAddressIfSet(), m_objectStore);
659
    } catch (...) {}
660
  }
661
662
}

663
664
665
666
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
std::list<cta::common::dataStructures::ArchiveJob>
667
  OStoreDB::getArchiveJobs(const std::string &tapePoolName) const
668
{
669
  std::list<cta::common::dataStructures::ArchiveJob> ret;
670

671
672
673
  for(ArchiveQueueItor_t q_it(m_objectStore, tapePoolName); !q_it.end() ; ++q_it) {
    ret.push_back(*q_it);
  }
674

675
  return ret;
676
677
}

678
679
680
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
681
std::map<std::string, std::list<common::dataStructures::ArchiveJob>>
682
683
   OStoreDB::getArchiveJobs() const
{
684
  std::map<std::string, std::list<common::dataStructures::ArchiveJob>> ret;
685

686
687
688
  for(ArchiveQueueItor_t q_it(m_objectStore); !q_it.end(); ++q_it) {
    ret[q_it.qid()].push_back(*q_it);
  }
689

690
  return ret;
691
692
}

693
694
695
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobItor()
//------------------------------------------------------------------------------
696
OStoreDB::ArchiveQueueItor_t OStoreDB::getArchiveJobItor(const std::string &tapePoolName) const
697
{
698
  return ArchiveQueueItor_t(m_objectStore, tapePoolName);
699
700
}

701
702
703
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveQueueStatistics()
//------------------------------------------------------------------------------
704
705
706
std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueueStatistics(
  const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
  const std::set<std::string> & vidsToConsider) {
707
  return Helpers::getRetrieveQueueStatistics(criteria, vidsToConsider, m_objectStore);
708
709
}

710
711
712
//------------------------------------------------------------------------------
// OStoreDB::queueRetrieve()
//------------------------------------------------------------------------------
713
std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
714
  const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, log::LogContext &logContext) {
715
  assertAgentAddressSet();
716
717
718
  auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
  cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
  auto *mutexForHelgrindAddr = mutexForHelgrind.release();
719
  cta::utils::Timer timer;
720
721
722
723
  // Get the best vid from the cache
  std::set<std::string> candidateVids;
  for (auto & tf:criteria.archiveFile.tapeFiles) candidateVids.insert(tf.second.vid);
  std::string bestVid=Helpers::selectBestRetrieveQueue(candidateVids, m_catalogue, m_objectStore);
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
  // Check that the requested retrieve job (for the provided vid) exists, and record the copynb.
  uint64_t bestCopyNb;
  for (auto & tf: criteria.archiveFile.tapeFiles) {
    if (tf.second.vid == bestVid) {
      bestCopyNb = tf.second.copyNb;
      goto vidFound;
    }
  }
  {
    std::stringstream err;
    err << "In OStoreDB::queueRetrieve(): no tape file for requested vid. archiveId=" << criteria.archiveFile.archiveFileID
        << " vid=" << bestVid;
    throw RetrieveRequestHasNoCopies(err.str());
  }
  vidFound:
739
  // In order to post the job, construct it first in memory.
740
  auto rReq = cta::make_unique<objectstore::RetrieveRequest> (m_agentReference->nextId("RetrieveRequest"), m_objectStore);
741
742
743
  rReq->initialize();
  rReq->setSchedulerRequest(rqst);
  rReq->setRetrieveFileQueueCriteria(criteria);
744
  // Find the job corresponding to the vid (and check we indeed have one).
745
746
  auto jobs = rReq->getJobs();
  objectstore::RetrieveRequest::JobDump job;
747
748
  for (auto & j:jobs) {
    if (j.copyNb == bestCopyNb) {
749
      job = j;
750
      goto jobFound;
751
752
    }
  }
753
754
755
756
757
758
759
760
  {
    std::stringstream err;
    err << "In OStoreDB::queueRetrieve(): no job for requested copyNb. archiveId=" << criteria.archiveFile.archiveFileID
        << " vid=" << bestVid << " copyNb=" << bestCopyNb;
    throw RetrieveRequestHasNoCopies(err.str());
  }
  jobFound:
  {
761
762
763
764
765
766
767
768
769
770
    // We are ready to enqueue the request. Let's make the data safe and do the rest behind the scenes.
    // Reference the request in the process's agent.
    double vidSelectionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    m_agentReference->addToOwnership(rReq->getAddressIfSet(), m_objectStore);
    double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    rReq->setOwner(m_agentReference->getAgentAddress());
    // "Select" an arbitrary copy number. This is needed to serialize the object.
    rReq->setActiveCopyNumber(criteria.archiveFile.tapeFiles.begin()->second.copyNb);
    rReq->insert();
    double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
771
772
    m_taskQueueSize++;
    uint64_t taskQueueSize = m_taskQueueSize;
773
    // Prepare the logs to avoid multithread access on the object.
774
    log::ScopedParamContainer params(logContext);
775
    params.add("tapeVid", bestVid)
776
777
778
779
780
781
782
783
          .add("jobObject", rReq->getAddressIfSet())
          .add("fileId", rReq->getArchiveFile().archiveFileID)
          .add("diskInstance", rReq->getArchiveFile().diskInstance)
          .add("diskFilePath", rReq->getArchiveFile().diskFileInfo.path)
          .add("diskFileId", rReq->getArchiveFile().diskFileId)
          .add("vidSelectionTime", vidSelectionTime)
          .add("agentReferencingTime", agentReferencingTime)
          .add("insertionTime", insertionTime);
784
    delayIfNecessary(logContext);
785
786
    auto rReqPtr = rReq.release();
    auto * et = new EnqueueingTask([rReqPtr, job, bestVid, mutexForHelgrindAddr, this]{
787
      std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr);
788
      std::unique_ptr<objectstore::RetrieveRequest> rReq(rReqPtr);
789
      cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
790
791
      // This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
      auto scopedCounterDecrement = [this](void *){ 
792
793
        m_taskQueueSize--;
        m_taskPostingSemaphore.release();
794
795
796
797
798
799
800
      };
      // A bit ugly, but we need a non-null pointer for the "deleter" to be called.
      std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
      log::LogContext logContext(m_logger);
      utils::Timer timer;
      // Add the request to the queue (with a shared access).
      auto nonConstJob = job;
801
802
803
      objectstore::ScopedExclusiveLock rReqL(*rReq);
      double rLockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
      rReq->fetch();
804
805
806
807
808
      auto sharedLock = ostoredb::MemRetrieveQueue::sharedAddToQueue(nonConstJob, bestVid, *rReq, *this, logContext);
      double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
      // The object ownership was set in SharedAdd.
      // We need to extract the owner before inserting. After, we would need to hold a lock.
      auto owner = rReq->getOwner();
809
810
      rReq->commit();
      double cTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
811
812
813
      // The lock on the queue is released here (has to be after the request commit for consistency.
      sharedLock.reset();
      double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
814
815
      rReqL.release();
      double rUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
816
817
818
      // And remove reference from the agent
      m_agentReference->removeFromOwnership(rReq->getAddressIfSet(), m_objectStore);
      double agOwnershipResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
819
      log::ScopedParamContainer params(logContext);
820
      params.add("tapeVid", bestVid)
821
822
823
824
825
826
            .add("queueObject", owner)
            .add("jobObject", rReq->getAddressIfSet())
            .add("fileId", rReq->getArchiveFile().archiveFileID)
            .add("diskInstance", rReq->getArchiveFile().diskInstance)
            .add("diskFilePath", rReq->getArchiveFile().diskFileInfo.path)
            .add("diskFileId", rReq->getArchiveFile().diskFileId)
827
            .add("requestLockTime", rLockTime)
828
            .add("queueingTime", qTime)
829
830
831
            .add("commitTime", cTime)
            .add("queueUnlockTime", qUnlockTime)
            .add("requestUnlockTime", rUnlockTime)
832
833
834
            .add("agentOwnershipResetTime", agOwnershipResetTime)
            .add("totalTime", rLockTime + qTime + cTime + qUnlockTime 
                + rUnlockTime + agOwnershipResetTime);
Eric Cano's avatar
Eric Cano committed
835
      logContext.log(log::INFO, "In OStoreDB::queueRetrieve_bottomHalf(): added job to queue (enqueueing finished).");
836
837
838
839
    });
    ANNOTATE_HAPPENS_BEFORE(et);
    mlForHelgrind.unlock();
    m_enqueueingTasksQueue.push(et);
840
841
    double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
    params.add("taskPostingTime", taskPostingTime)
Eric Cano's avatar
Eric Cano committed
842
          .add("taskQueueSize", taskQueueSize)
843
          .add("totalTime", vidSelectionTime + agentReferencingTime + insertionTime + taskPostingTime);
Eric Cano's avatar
Eric Cano committed
844
    logContext.log(log::INFO, "In OStoreDB::queueRetrieve(): recorded request for queueing (enqueueing posted to thread pool).");  
845
  }
846
  return bestVid;
847
848
}

849
850
851
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveRequestsByVid()
//------------------------------------------------------------------------------
852
853
std::list<RetrieveRequestDump> OStoreDB::getRetrieveRequestsByVid(const std::string& vid) const {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
854
855
}

856
857
858
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveRequestsByRequester()
//------------------------------------------------------------------------------
859
860
861
862
std::list<RetrieveRequestDump> OStoreDB::getRetrieveRequestsByRequester(const std::string& vid) const {
  throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
}

863
864
865
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveRequests()
//------------------------------------------------------------------------------
866
std::map<std::string, std::list<RetrieveRequestDump> > OStoreDB::getRetrieveRequests() const {
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
  throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
//  std::map<cta::Tape, std::list<RetrieveRequestDump> > ret;
//  // Get list of tape pools and then tapes
//  objectstore::RootEntry re(m_objectStore);
//  objectstore::ScopedSharedLock rel(re);
//  re.fetch();
//  auto tpl=re.dumpTapePools();
//  rel.release();
//  for (auto tpp = tpl.begin(); tpp != tpl.end(); tpp++) {
//    // Get the list of tapes for the tape pool
//    objectstore::TapePool tp(tpp->address, m_objectStore);
//    objectstore::ScopedSharedLock tplock(tp);
//    tp.fetch();
//    auto tl = tp.dumpTapes();
//    for (auto tptr = tl.begin(); tptr!= tl.end(); tptr++) {
//      // Get the list of retrieve requests for the tape.
//      objectstore::Tape t(tptr->address, m_objectStore);
//      objectstore::ScopedSharedLock tlock(t);
//      t.fetch();
//      auto jobs = t.dumpAndFetchRetrieveRequests();
//      // If the list is not empty, add to the map.
//      if (jobs.size()) {
//        cta::Tape tkey;
//        // TODO tkey.capacityInBytes;
//        tkey.creationLog = t.getCreationLog();
//        // TODO tkey.dataOnTapeInBytes;
//        tkey.logicalLibraryName = t.getLogicalLibrary();
//        tkey.nbFiles = t.getLastFseq();
//        // TODO tkey.status
//        tkey.tapePoolName = tp.getName();
//        tkey.vid = t.getVid();
//        ret[tkey] = std::move(jobs);
//      }
//    }
//  }
//  return ret;
903
904
}

905
906
907
//------------------------------------------------------------------------------
// OStoreDB::deleteRetrieveRequest()
//------------------------------------------------------------------------------
908
void OStoreDB::deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& requester, 
909
910
  const std::string& remoteFile) {
  throw exception::Exception("Not Implemented");
911
}
912

913
914
915
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveJobs()
//------------------------------------------------------------------------------
916
std::list<cta::common::dataStructures::RetrieveJob>
917
OStoreDB::getRetrieveJobs(const std::string &vid) const
918
{
919
  std::list<common::dataStructures::RetrieveJob> ret;
920

921
922
923
924
925
  for(RetrieveQueueItor_t q_it(m_objectStore, vid); !q_it.end(); ++q_it) {
    ret.push_back(*q_it);
  }

  return ret;
926
927
}

928
929
930
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveJobs()
//------------------------------------------------------------------------------
931
932
933
std::map<std::string, std::list<common::dataStructures::RetrieveJob>>
OStoreDB::getRetrieveJobs() const
{
934
  std::map<std::string, std::list<common::dataStructures::RetrieveJob>> ret;
935

936
937
938
  for(RetrieveQueueItor_t q_it(m_objectStore); !q_it.end(); ++q_it) {
    ret[q_it.qid()].push_back(*q_it);
  }
939

940
  return ret;
941
942
}

943
944
945
946
947
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveJobItor()
//------------------------------------------------------------------------------
OStoreDB::RetrieveQueueItor_t OStoreDB::getRetrieveJobItor(const std::string &vid) const
{
948
  return RetrieveQueueItor_t(m_objectStore, vid);
949
950
951
}


952
953
954
//------------------------------------------------------------------------------
// OStoreDB::getDriveStates()
//------------------------------------------------------------------------------
955
956
std::list<cta::common::dataStructures::DriveState> OStoreDB::getDriveStates(log::LogContext & lc) const {
  return Helpers::getAllDriveStates(m_objectStore, lc);
957
958
959
960
961
}

//------------------------------------------------------------------------------
// OStoreDB::setDesiredDriveState()
//------------------------------------------------------------------------------
962
963
964
965
966
void OStoreDB::setDesiredDriveState(const std::string& drive, const common::dataStructures::DesiredDriveState & desiredState, log::LogContext &lc) {
  objectstore::DriveState ds(m_objectStore);
  ScopedExclusiveLock dsl;
  Helpers::getLockedAndFetchedDriveState(ds, dsl, *m_agentReference, drive, lc);
  auto driveState = ds.getState();
967
  driveState.desiredDriveState = desiredState;
968
969
  ds.setState(driveState);
  ds.commit();
970
971
}

972
973
974
//------------------------------------------------------------------------------
// OStoreDB::removeDrive()
//------------------------------------------------------------------------------
975
void OStoreDB::removeDrive(const std::string& drive, log::LogContext &lc) {
976
977
978
979
980
981
  RootEntry re(m_objectStore);
  re.fetchNoLock();
  auto driveRegisterAddress = re.getDriveRegisterAddress();
  objectstore::DriveRegister dr(driveRegisterAddress, m_objectStore);
  objectstore::ScopedExclusiveLock drl(dr);
  dr.fetch();
Eric Cano's avatar