OStoreDB.cpp 102 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "OStoreDB.hpp"
20
#include "MemQueues.hpp"
21
#include "common/dataStructures/SecurityIdentity.hpp"
22
#include "objectstore/RootEntry.hpp"
23
24
#include "objectstore/ArchiveQueue.hpp"
#include "objectstore/RetrieveQueue.hpp"
25
#include "objectstore/DriveRegister.hpp"
26
#include "objectstore/ArchiveRequest.hpp"
27
#include "objectstore/RetrieveRequest.hpp"
28
#include "common/exception/Exception.hpp"
29
30
31
#include "common/admin/AdminHost.hpp"
#include "common/admin/AdminUser.hpp"
#include "common/archiveRoutes/ArchiveRoute.hpp"
32
#include "common/utils/utils.hpp"
33
#include "scheduler/LogicalLibrary.hpp"
34
#include "common/TapePool.hpp"
35
#include "common/dataStructures/MountPolicy.hpp"
36
#include <algorithm>
37
38
#include <stdlib.h>     /* srand, rand */
#include <time.h>       /* time */
39
#include <stdexcept>
40
#include <set>
41
#include <iostream>
42

43
namespace cta {  
44
45
using namespace objectstore;

46
47
48
//------------------------------------------------------------------------------
// OStoreDB::OStoreDB()
//------------------------------------------------------------------------------
49
OStoreDB::OStoreDB(objectstore::Backend& be):
50
  m_objectStore(be) {}
51

52
53
54
//------------------------------------------------------------------------------
// OStoreDB::~OStoreDB()
//------------------------------------------------------------------------------
55
56
OStoreDB::~OStoreDB() throw() {}

57
58
59
//------------------------------------------------------------------------------
// OStoreDB::setAgentReference()
//------------------------------------------------------------------------------
60
61
62
void OStoreDB::setAgentReference(objectstore::AgentReference *agentReference) {
  m_agentReference = agentReference;
}
63

64
65
66
//------------------------------------------------------------------------------
// OStoreDB::assertAgentAddressSet()
//------------------------------------------------------------------------------
67
68
69
void OStoreDB::assertAgentAddressSet() {
  if (!m_agentReference)
    throw AgentNotSet("In OStoreDB::assertAgentSet: Agent address not set");
70
}
71

72
73
74
//------------------------------------------------------------------------------
// OStoreDB::ping()
//------------------------------------------------------------------------------
75
void OStoreDB::ping() {
76
  // Validate we can lock and fetch the root entry.
77
78
79
  objectstore::RootEntry re(m_objectStore);
  objectstore::ScopedSharedLock rel(re);
  re.fetch();
80
81
}

82
83
84
//------------------------------------------------------------------------------
// OStoreDB::getMountInfo()
//------------------------------------------------------------------------------
85
86
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> 
  OStoreDB::getMountInfo() {
87
  //Allocate the getMountInfostructure to return.
88
  assertAgentAddressSet();
89
  std::unique_ptr<OStoreDB::TapeMountDecisionInfo> privateRet (new OStoreDB::TapeMountDecisionInfo(
90
    m_objectStore, *m_agentReference));
91
  TapeMountDecisionInfo & tmdi=*privateRet;
92
  // Get all the tape pools and tapes with queues (potential mounts)
93
94
95
  objectstore::RootEntry re(m_objectStore);
  objectstore::ScopedSharedLock rel(re);
  re.fetch();
96
  // Take an exclusive lock on the scheduling and fetch it.
97
98
99
100
  tmdi.m_schedulerGlobalLock.reset(
    new SchedulerGlobalLock(re.getSchedulerGlobalLock(), m_objectStore));
  tmdi.m_lockOnSchedulerGlobalLock.lock(*tmdi.m_schedulerGlobalLock);
  tmdi.m_lockTaken = true;
101
  tmdi.m_schedulerGlobalLock->fetch();
102
103
104
  // Walk the archive queues for statistics
  for (auto & aqp: re.dumpArchiveQueues()) {
    objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore);
105
    // debug utility variable
106
107
    std::string __attribute__((__unused__)) poolName = aqp.tapePool;
    objectstore::ScopedSharedLock aqlock(aqueue);
108
    aqueue.fetch();
109
110
    // If there are files queued, we create an entry for this tape pool in the
    // mount candidates list.
111
    if (aqueue.getJobsSummary().files) {
112
      tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
113
      auto & m = tmdi.potentialMounts.back();
114
      m.tapePool = aqp.tapePool;
115
      m.type = cta::common::dataStructures::MountType::Archive;
116
117
118
119
      m.bytesQueued = aqueue.getJobsSummary().bytes;
      m.filesQueued = aqueue.getJobsSummary().files;      
      m.oldestJobStartTime = aqueue.getJobsSummary().oldestJobStartTime;
      m.priority = aqueue.getJobsSummary().priority;
120
121
      m.maxDrivesAllowed = aqueue.getJobsSummary().maxDrivesAllowed;
      m.minArchiveRequestAge = aqueue.getJobsSummary().minArchiveRequestAge;
122
123
      m.logicalLibrary = "";
    }
124
  }
125
  // Walk the retrieve queues for statistics
126
127
128
129
130
131
132
133
134
135
136
137
  for (auto & rqp: re.dumpRetrieveQueues()) {
    RetrieveQueue rqueue(rqp.address, m_objectStore);
    // debug utility variable
    std::string __attribute__((__unused__)) vid = rqp.vid;
    ScopedSharedLock rqlock(rqueue);
    rqueue.fetch();
    // If there are files queued, we create an entry for this retrieve queue in the
    // mount candidates list.
    if (rqueue.getJobsSummary().files) {
      tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
      auto & m = tmdi.potentialMounts.back();
      m.vid = rqp.vid;
138
      m.type = cta::common::dataStructures::MountType::Retrieve;
139
140
141
142
143
144
145
146
147
      m.bytesQueued = rqueue.getJobsSummary().bytes;
      m.filesQueued = rqueue.getJobsSummary().files;      
      m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime;
      m.priority = rqueue.getJobsSummary().priority;
      m.maxDrivesAllowed = rqueue.getJobsSummary().maxDrivesAllowed;
      m.minArchiveRequestAge = rqueue.getJobsSummary().minArchiveRequestAge;
      m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller.
    }
  }
148
149
150
  // Collect information about the existing and next mounts
  // If a next mount exists it "counts double", but the corresponding drive
  // is either about to mount, or about to replace its current mount.
151
152
153
  objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
  objectstore::ScopedSharedLock drl(dr);
  dr.fetch();
154
155
  auto dl = dr.getAllDrivesState();
  using common::dataStructures::DriveStatus;
156
  std::set<int> activeDriveStatuses = {
157
158
159
160
161
162
    (int)cta::common::dataStructures::DriveStatus::Starting,
    (int)cta::common::dataStructures::DriveStatus::Mounting,
    (int)cta::common::dataStructures::DriveStatus::Transfering,
    (int)cta::common::dataStructures::DriveStatus::Unloading,
    (int)cta::common::dataStructures::DriveStatus::Unmounting,
    (int)cta::common::dataStructures::DriveStatus::DrainingToDisk };
163
164
165
166
  std::set<int> activeMountTypes = {
    (int)cta::common::dataStructures::MountType::Archive,
    (int)cta::common::dataStructures::MountType::Retrieve,
    (int)cta::common::dataStructures::MountType::Label };
167
  for (auto d=dl.begin(); d!= dl.end(); d++) {
168
    if (activeDriveStatuses.count((int)d->driveStatus)) {
169
170
171
172
      tmdi.existingOrNextMounts.push_back(ExistingMount());
      tmdi.existingOrNextMounts.back().type = d->mountType;
      tmdi.existingOrNextMounts.back().tapePool = d->currentTapePool;
      tmdi.existingOrNextMounts.back().driveName = d->driveName;
173
      tmdi.existingOrNextMounts.back().vid = d->currentVid;
174
175
176
177
178
179
    }
    if (activeMountTypes.count((int)d->nextMountType)) {
      tmdi.existingOrNextMounts.push_back(ExistingMount());
      tmdi.existingOrNextMounts.back().type = d->nextMountType;
      tmdi.existingOrNextMounts.back().tapePool = d->nextTapepool;
      tmdi.existingOrNextMounts.back().driveName = d->driveName;
180
      tmdi.existingOrNextMounts.back().vid = d->nextVid;
181
182
183
184
    }
  }
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
  return ret;
185
}
186
/* Old getMountInfo
187
188
189
//------------------------------------------------------------------------------
// OStoreDB::getMountInfo()
//------------------------------------------------------------------------------
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> 
  OStoreDB::getMountInfo() {
  //Allocate the getMountInfostructure to return.
  assertAgentSet();
  std::unique_ptr<TapeMountDecisionInfo> privateRet (new TapeMountDecisionInfo(
    m_objectStore, *m_agent));
  TapeMountDecisionInfo & tmdi=*privateRet;
  // Get all the tape pools and tapes with queues (potential mounts)
  objectstore::RootEntry re(m_objectStore);
  objectstore::ScopedSharedLock rel(re);
  re.fetch();
  // Take an exclusive lock on the scheduling and fetch it.
  tmdi.m_schedulerGlobalLock.reset(
    new SchedulerGlobalLock(re.getSchedulerGlobalLock(), m_objectStore));
  tmdi.m_lockOnSchedulerGlobalLock.lock(*tmdi.m_schedulerGlobalLock);
  tmdi.m_lockTaken = true;
  tmdi.m_schedulerGlobalLock->fetch();
  auto tpl = re.dumpTapePools();
  for (auto tpp=tpl.begin(); tpp!=tpl.end(); tpp++) {
    // Get the tape pool object
    objectstore::TapePool tpool(tpp->address, m_objectStore);
    // debug utility variable
    std::string __attribute__((__unused__)) poolName = tpp->tapePool;
    objectstore::ScopedSharedLock tpl(tpool);
    tpool.fetch();
    // If there are files queued, we create an entry for this tape pool in the
    // mount candidates list.
    if (tpool.getJobsSummary().files) {
      tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
      auto & m = tmdi.potentialMounts.back();
      m.tapePool = tpp->tapePool;
      m.type = cta::MountType::ARCHIVE;
      m.bytesQueued = tpool.getJobsSummary().bytes;
      m.filesQueued = tpool.getJobsSummary().files;      
      m.oldestJobStartTime = tpool.getJobsSummary().oldestJobStartTime;
      m.priority = tpool.getJobsSummary().priority;
      
      m.mountCriteria.maxFilesQueued = 
          tpool.getMountCriteriaByDirection().archive.maxFilesQueued;
      m.mountCriteria.maxBytesQueued = 
          tpool.getMountCriteriaByDirection().archive.maxBytesQueued;
      m.mountCriteria.maxAge = 
          tpool.getMountCriteriaByDirection().archive.maxAge;
      m.mountCriteria.quota = 
          tpool.getMountCriteriaByDirection().archive.quota;
      m.logicalLibrary = "";

    }
    // For each tape in the pool, list the tapes with work
    auto tl = tpool.dumpTapesAndFetchStatus();
    for (auto tp = tl.begin(); tp!= tl.end(); tp++) {
      objectstore::Tape t(tp->address, m_objectStore);
      objectstore::ScopedSharedLock tl(t);
      t.fetch();
      if (t.getJobsSummary().files) {
        tmdi.potentialMounts.push_back(PotentialMount());
        auto & m = tmdi.potentialMounts.back();
        m.type = cta::MountType::RETRIEVE;
        m.bytesQueued = t.getJobsSummary().bytes;
        m.filesQueued = t.getJobsSummary().files;
        m.oldestJobStartTime = t.getJobsSummary().oldestJobStartTime;
        m.priority = t.getJobsSummary().priority;
        m.vid = t.getVid();
        m.logicalLibrary = t.getLogicalLibrary();
        
        m.mountCriteria.maxFilesQueued = 
            tpool.getMountCriteriaByDirection().retrieve.maxFilesQueued;
        m.mountCriteria.maxBytesQueued = 
            tpool.getMountCriteriaByDirection().retrieve.maxBytesQueued;
        m.mountCriteria.maxAge = 
            tpool.getMountCriteriaByDirection().retrieve.maxAge;
        m.mountCriteria.quota = 
            tpool.getMountCriteriaByDirection().retrieve.quota;
        m.logicalLibrary = t.getLogicalLibrary();
      }
    }
  }
  // Dedication information comes here
  // TODO
  // 
  // Collect information about the existing mounts
  objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
  objectstore::ScopedSharedLock drl(dr);
  dr.fetch();
  auto dl = dr.dumpDrives();
  using common::DriveStatus;
  std::set<int> activeDriveStatuses = {
    (int)DriveStatus::Starting,
    (int)DriveStatus::Mounting,
    (int)DriveStatus::Transfering,
    (int)DriveStatus::Unloading,
    (int)DriveStatus::Unmounting,
    (int)DriveStatus::DrainingToDisk };
  for (auto d=dl.begin(); d!= dl.end(); d++) {
    if (activeDriveStatuses.count((int)d->status)) {
      tmdi.existingMounts.push_back(ExistingMount());
      tmdi.existingMounts.back().type = d->mountType;
      tmdi.existingMounts.back().tapePool = d->currentTapePool;
    }
  }
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
  return ret;
}
*/

295
296
297
//------------------------------------------------------------------------------
// OStoreDB::queueArchive()
//------------------------------------------------------------------------------
298
void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, 
299
        const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria, log::LogContext &logContext) {
300
  assertAgentAddressSet();
301
  // Construct the archive request object in memory
302
  cta::objectstore::ArchiveRequest aReq(m_agentReference->nextId("ArchiveRequest"), m_objectStore);
303
304
305
306
307
308
309
310
  aReq.initialize();
  // Summarize all as an archiveFile
  cta::common::dataStructures::ArchiveFile aFile;
  aFile.archiveFileID = criteria.fileId;
  aFile.checksumType = request.checksumType;
  aFile.checksumValue = request.checksumValue;
  // TODO: fully fledged archive file should not be the representation of the request.
  aFile.creationTime = std::numeric_limits<decltype(aFile.creationTime)>::min();
311
  aFile.reconciliationTime = 0;
312
313
  aFile.diskFileId = request.diskFileID;
  aFile.diskFileInfo = request.diskFileInfo;
314
  aFile.diskInstance = instanceName;
315
316
317
318
  aFile.fileSize = request.fileSize;
  aFile.storageClass = request.storageClass;
  aReq.setArchiveFile(aFile);
  aReq.setMountPolicy(criteria.mountPolicy);
319
  aReq.setArchiveReportURL(request.archiveReportURL);
320
321
  aReq.setRequester(request.requester);
  aReq.setSrcURL(request.srcURL);
322
  aReq.setEntryLog(request.creationLog);
323
  std::list<cta::objectstore::ArchiveRequest::JobDump> jl;
324
325
326
  for (auto & copy:criteria.copyToPoolMap) {
    const uint32_t hardcodedRetriesWithinMount = 3;
    const uint32_t hardcodedTotalRetries = 6;
327
    aReq.addJob(copy.first, copy.second, "archive queue address to be set later", hardcodedRetriesWithinMount, hardcodedTotalRetries);
328
    jl.push_back(cta::objectstore::ArchiveRequest::JobDump());
329
330
    jl.back().copyNb = copy.first;
    jl.back().tapePool = copy.second;
331
  }
332
  if (jl.empty()) {
333
334
335
    throw ArchiveRequestHasNoCopies("In OStoreDB::queue: the archive to file request has no copy");
  }
  // We create the object here
336
  m_agentReference->addToOwnership(aReq.getAddressIfSet(), m_objectStore);
337
  aReq.setOwner(m_agentReference->getAgentAddress());
338
339
  aReq.insert();
  ScopedExclusiveLock arl(aReq);
340
341
  // We can now enqueue the requests
  std::list<std::string> linkedTapePools;
342
  std::string currentTapepool;
343
  try {
344
    for (auto &j: aReq.dumpJobs()) {
345
      currentTapepool = j.tapePool;
Eric Cano's avatar
Eric Cano committed
346
      // The shared lock will be released automatically at the end of this scope.
347
348
349
      // The queueing implicitly sets the job owner as the queue (as should be). The queue should not
      // be unlocked before we commit the archive request (otherwise, the pointer could be seen as
      // stale and the job would be dereferenced from the queue.
Eric Cano's avatar
Eric Cano committed
350
      auto shareLock = ostoredb::MemArchiveQueue::sharedAddToArchiveQueue(j, aReq, *this, logContext);
351
352
353
      aReq.commit();
      // Now we can let go off the queue.
      shareLock.reset();
354
      linkedTapePools.push_back(j.ArchiveQueueAddress);
355
356
357
358
359
      log::ScopedParamContainer params(logContext);
      params.add("tapepool", j.tapePool)
            .add("queueObject", j.ArchiveQueueAddress)
            .add("jobObject", aReq.getAddressIfSet());
      logContext.log(log::INFO, "In OStoreDB::queueArchive(): added job to queue");
360
361
362
363
364
365
366
    }
  } catch (NoSuchArchiveQueue &) {
    // Unlink the request from already connected tape pools
    for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) {
      objectstore::ArchiveQueue aq(*tpa, m_objectStore);
      ScopedExclusiveLock aql(aq);
      aq.fetch();
367
      aq.removeJob(aReq.getAddressIfSet());
368
      aq.commit();
369
      aReq.remove();
370
    }
371
372
373
374
    log::ScopedParamContainer params(logContext);
    params.add("tapepool", currentTapepool)
          .add("jobObject", aReq.getAddressIfSet());
    logContext.log(log::INFO, "In OStoreDB::queueArchive(): failed to enqueue job");
375
376
377
378
    throw;
  }
  // The request is now fully set. As it's multi-owned, we do not set the owner,
  // just to disown it from the agent.
379
380
  aReq.setOwner("");
  aReq.commit();
381
382
  arl.release();
  // And remove reference from the agent
383
  m_agentReference->removeFromOwnership(aReq.getAddressIfSet(), m_objectStore);
384
385
}

386
387
388
//------------------------------------------------------------------------------
// OStoreDB::deleteArchiveRequest()
//------------------------------------------------------------------------------
389
void OStoreDB::deleteArchiveRequest(const std::string &diskInstanceName, 
390
  uint64_t fileId) {
391
392
393
394
  // First of, find the archive request form all the tape pools.
  objectstore::RootEntry re(m_objectStore);
  objectstore::ScopedSharedLock rel(re);
  re.fetch();
395
  auto aql = re.dumpArchiveQueues();
396
  rel.release();
397
398
399
  for (auto & aqp: aql) {
    objectstore::ArchiveQueue aq(aqp.address, m_objectStore);
    ScopedSharedLock aqlock(aq);
400
401
    aq.fetch();
    auto ajl=aq.dumpJobs();
402
403
404
405
    aqlock.release();
    for (auto & ajp: ajl) {
      objectstore::ArchiveRequest ar(ajp.address, m_objectStore);
      ScopedSharedLock arl(ar);
406
      ar.fetch();
407
      if (ar.getArchiveFile().archiveFileID == fileId) {
408
409
410
411
412
        // We found a job for the right file Id.
        // We now need to dequeue it from all it archive queues (one per job).
        // Upgrade the lock to an exclusive one.
        arl.release();
        ScopedExclusiveLock arxl(ar);
413
        m_agentReference->addToOwnership(ar.getAddressIfSet(), m_objectStore);
414
415
        ar.fetch();
        ar.setAllJobsFailed();
416
417
418
        for (auto j:ar.dumpJobs()) {
          // Dequeue the job from the queue.
          // The owner might not be a queue, in which case the fetch will fail (and it's fine)
419
          try {
420
421
422
423
424
425
            // The queue on which we found the job is not locked anymore, so we can re-lock it.
            ArchiveQueue aq2(j.ArchiveQueueAddress, m_objectStore);
            ScopedExclusiveLock aq2xl(aq2);
            aq2.fetch();
            aq2.removeJob(ar.getAddressIfSet());
            aq2.commit();
426
          } catch (...) {}
427
          ar.setJobOwner(j.copyNb, m_agentReference->getAgentAddress());
428
        }
429
        ar.remove();
430
        m_agentReference->removeFromOwnership(ar.getAddressIfSet(), m_objectStore);
431
432
        // We found and deleted the job: return.
        return;
433
434
435
436
      }
    }
  }
  throw NoSuchArchiveRequest("In OStoreDB::deleteArchiveRequest: ArchiveToFileRequest not found");
437
438
}

439
440
441
//------------------------------------------------------------------------------
// OStoreDB::markArchiveRequestForDeletion()
//------------------------------------------------------------------------------
442
std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation>
443
  OStoreDB::markArchiveRequestForDeletion(const common::dataStructures::SecurityIdentity& requester,
444
  uint64_t fileId) {
445
  assertAgentAddressSet();
446
447
  // Construct the return value immediately
  std::unique_ptr<cta::OStoreDB::ArchiveToFileRequestCancelation>
448
    internalRet(new cta::OStoreDB::ArchiveToFileRequestCancelation(*m_agentReference, m_objectStore));
449
  cta::objectstore::ArchiveRequest & ar = internalRet->m_request;
450
451
452
453
454
  cta::objectstore::ScopedExclusiveLock & atfrl = internalRet->m_lock;
  // Attempt to find the request
  objectstore::RootEntry re(m_objectStore);
  ScopedSharedLock rel(re);
  re.fetch();
Eric Cano's avatar
Eric Cano committed
455
  auto tpl=re.dumpArchiveQueues();
456
457
458
  rel.release();
  for (auto tpp=tpl.begin(); tpp!=tpl.end(); tpp++) {
    try {
459
460
461
462
      objectstore::ArchiveQueue aq(tpp->address, m_objectStore);
      ScopedSharedLock tpl(aq);
      aq.fetch();
      auto arl = aq.dumpJobs();
463
464
      tpl.release();
      for (auto arp=arl.begin(); arp!=arl.end(); arp++) {
465
466
467
        objectstore::ArchiveRequest tar(arp->address, m_objectStore);
        objectstore::ScopedSharedLock tatfrl(tar);
        tar.fetch();
468
        if (tar.getArchiveFile().archiveFileID == fileId) {
469
          // Point the agent to the request
470
          m_agentReference->addToOwnership(arp->address, m_objectStore);
471
472
          // Mark all jobs are being pending NS deletion (for being deleted them selves) 
          tatfrl.release();
473
474
475
476
477
          ar.setAddress(arp->address);
          atfrl.lock(ar);
          ar.fetch();
          ar.setAllJobsPendingNSdeletion();
          ar.commit();
478
          // Unlink the jobs from the tape pools (it is safely referenced in the agent)
479
480
481
482
483
484
485
          auto arJobs=ar.dumpJobs();
          for (auto atpp=arJobs.begin(); atpp!=arJobs.end(); atpp++) {
            objectstore::ArchiveQueue aqp(atpp->ArchiveQueueAddress, m_objectStore);
            objectstore::ScopedExclusiveLock atpl(aqp);
            aqp.fetch();
            aqp.removeJob(arp->address);
            aqp.commit();
486
487
488
489
490
491
492
493
494
495
496
497
          }
          // Return the object to the caller, so complete() can be called later.
          std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCancelation> ret;
          ret.reset(internalRet.release());
          return ret;
        }
      }
    } catch (...) {}
  }
  throw NoSuchArchiveRequest("In OStoreDB::markArchiveRequestForDeletion: ArchiveToFileRequest no found");
  }

498
499
500
//------------------------------------------------------------------------------
// OStoreDB::ArchiveToFileRequestCancelation::complete()
//------------------------------------------------------------------------------
501
502
503
504
505
void OStoreDB::ArchiveToFileRequestCancelation::complete() {
  if (m_closed)
    throw ArchiveRequestAlreadyDeleted("OStoreDB::ArchiveToFileRequestCancelation::complete(): called twice");
  // We just need to delete the object and forget it
  m_request.remove();
506
  m_agentReference.removeFromOwnership(m_request.getAddressIfSet(), m_objectStore);
507
  m_closed = true;
508
}
509

510
511
512
//------------------------------------------------------------------------------
// OStoreDB::ArchiveToFileRequestCancelation::~ArchiveToFileRequestCancelation()
//------------------------------------------------------------------------------
513
514
OStoreDB::ArchiveToFileRequestCancelation::~ArchiveToFileRequestCancelation() {
  if (!m_closed) {
515
    try {
516
517
      m_request.garbageCollect(m_agentReference.getAgentAddress());
      m_agentReference.removeFromOwnership(m_request.getAddressIfSet(), m_objectStore);
518
    } catch (...) {}
519
  }
520
521
}

522

523
524
525
//------------------------------------------------------------------------------
// OStoreDB::getArchiveRequests()
//------------------------------------------------------------------------------
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
//std::map<std::string, std::list<ArchiveToTapeCopyRequest> >
//  OStoreDB::getArchiveRequests() const {
//  objectstore::RootEntry re(m_objectStore);
//  objectstore::ScopedSharedLock rel(re);
//  re.fetch();
//  std::map<std::string, std::list<ArchiveToTapeCopyRequest> > ret;
//  auto aql = re.dumpArchiveQueues();
//  rel.release();
//  for (auto & aqp:aql) {
//    objectstore::ArchiveQueue osaq(aqp.address, m_objectStore);
//    ScopedSharedLock osaql(osaq);
//    osaq.fetch();  
//    auto arl = osaq.dumpJobs();
//    osaql.release();
//    for (auto & ar: arl) {
//      objectstore::ArchiveRequest osar(ar.address, m_objectStore);
//      ScopedSharedLock osarl(osar);
//      osar.fetch();
//      // Find which copy number is for this tape pool.
//      // skip the request if not found
//      auto jl = osar.dumpJobs();
//      uint16_t copynb;
//      bool copyndFound=false;
//      for (auto & j:jl) {
//        if (j.tapePool == aqp.tapePool) {
//          copynb = j.copyNb;
//          copyndFound = true;
//          break;
//        }
//      }
//      if (!copyndFound) continue;
//      ret[aqp.tapePool].push_back(cta::ArchiveToTapeCopyRequest(
//        osar.getDiskFileID(),
//        osar.getArchiveFileID(),
//        copynb,
//        aqp.tapePool,
//        osar.getMountPolicy().archivePriority,
//        osar.getCreationLog()));
//    }
//  }
//  return ret;
//}

569
570
571
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
572
573
std::list<cta::common::dataStructures::ArchiveJob>
  OStoreDB::getArchiveJobs(const std::string& tapePoolName) const {
574
575
576
  objectstore::RootEntry re(m_objectStore);
  objectstore::ScopedSharedLock rel(re);
  re.fetch();
577
  auto tpl = re.dumpArchiveQueues();
578
  rel.release();
579
580
581
582
583
584
  for (auto & tpp:tpl) {
    if (tpp.tapePool != tapePoolName) continue;
    std::list<cta::common::dataStructures::ArchiveJob> ret;
    objectstore::ArchiveQueue osaq(tpp.address, m_objectStore);
    ScopedSharedLock ostpl(osaq);
    osaq.fetch();
585
    auto arl = osaq.dumpJobs();
586
587
588
    ostpl.release();
    for (auto ar=arl.begin(); ar!=arl.end(); ar++) {
      objectstore::ArchiveRequest osar(ar->address, m_objectStore);
589
590
591
592
593
594
      ScopedSharedLock osarl(osar);
      osar.fetch();
      // Find which copy number is for this tape pool.
      // skip the request if not found
      uint16_t copynb;
      bool copyndFound=false;
595
596
      for (auto & j:osar.dumpJobs()) {
        if (j.tapePool == tpp.tapePool) {
597
          copynb = j.copyNb;
598
599
600
601
602
          copyndFound = true;
          break;
        }
      }
      if (!copyndFound) continue;
603
      ret.push_back(cta::common::dataStructures::ArchiveJob());
604
      ret.back().archiveFileID = osar.getArchiveFile().archiveFileID;
605
606
      ret.back().copyNumber = copynb;
      ret.back().tapePool = tpp.tapePool;
607
608
      ret.back().request.checksumType = osar.getArchiveFile().checksumType;
      ret.back().request.checksumValue = osar.getArchiveFile().checksumValue;
609
      ret.back().request.creationLog = osar.getEntryLog();
610
      ret.back().request.diskFileID = osar.getArchiveFile().diskFileId;
611
      ret.back().instanceName = osar.getArchiveFile().diskInstance;
612
613
      ret.back().request.requester = osar.getRequester();
      ret.back().request.srcURL = osar.getSrcURL();
614
      ret.back().request.archiveReportURL = osar.getArchiveReportURL();
615
      ret.back().request.storageClass = osar.getArchiveFile().storageClass;
616
    }
617
    return ret;
618
  }
619
  return std::list<cta::common::dataStructures::ArchiveJob>();
620
621
}

622
623
624
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobs()
//------------------------------------------------------------------------------
625
626
std::map<std::string, std::list<common::dataStructures::ArchiveJob> >
  OStoreDB::getArchiveJobs() const {
627
628
629
  objectstore::RootEntry re(m_objectStore);
  objectstore::ScopedSharedLock rel(re);
  re.fetch();
Eric Cano's avatar
Eric Cano committed
630
  auto tpl = re.dumpArchiveQueues();
631
  rel.release();
632
633
634
  std::map<std::string, std::list<common::dataStructures::ArchiveJob> > ret;
  for (auto & tpp:tpl) {
    objectstore::ArchiveQueue osaq(tpp.address, m_objectStore);
635
636
637
    ScopedSharedLock ostpl(osaq);
    osaq.fetch();
    auto arl = osaq.dumpJobs();
638
639
    ostpl.release();
    for (auto ar=arl.begin(); ar!=arl.end(); ar++) {
640
      objectstore::ArchiveRequest osar(ar->address, m_objectStore);
641
642
643
644
645
646
      ScopedSharedLock osarl(osar);
      osar.fetch();
      // Find which copy number is for this tape pool.
      // skip the request if not found
      uint16_t copynb;
      bool copyndFound=false;
647
648
649
      for (auto & j:osar.dumpJobs()) {
        if (j.tapePool == tpp.tapePool) {
          copynb = j.copyNb;
650
651
652
653
654
          copyndFound = true;
          break;
        }
      }
      if (!copyndFound) continue;
655
      ret[tpp.tapePool].push_back(cta::common::dataStructures::ArchiveJob());
656
      ret[tpp.tapePool].back().archiveFileID = osar.getArchiveFile().archiveFileID;
657
658
      ret[tpp.tapePool].back().copyNumber = copynb;
      ret[tpp.tapePool].back().tapePool = tpp.tapePool;
659
660
      ret[tpp.tapePool].back().request.checksumType = osar.getArchiveFile().checksumType;
      ret[tpp.tapePool].back().request.checksumValue = osar.getArchiveFile().checksumValue;
661
      ret[tpp.tapePool].back().request.creationLog = osar.getEntryLog();
662
663
664
      ret[tpp.tapePool].back().request.diskFileID = osar.getArchiveFile().diskFileId;
      ret[tpp.tapePool].back().request.diskFileInfo = osar.getArchiveFile().diskFileInfo;
      ret[tpp.tapePool].back().request.fileSize = osar.getArchiveFile().fileSize;
665
      ret[tpp.tapePool].back().instanceName = osar.getArchiveFile().diskInstance;
666
667
      ret[tpp.tapePool].back().request.requester = osar.getRequester();
      ret[tpp.tapePool].back().request.srcURL = osar.getSrcURL();
668
      ret[tpp.tapePool].back().request.archiveReportURL = osar.getArchiveReportURL();
669
      ret[tpp.tapePool].back().request.storageClass = osar.getArchiveFile().storageClass;
670
671
    }
  }
672
  return ret;
673
674
}

675
676
677
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveQueueStatistics()
//------------------------------------------------------------------------------
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueueStatistics(
  const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
  const std::set<std::string> & vidsToConsider) {
  std::list<RetrieveQueueStatistics> ret;
  // Find the retrieve queues for each vid if they exist (absence is possible).
  RootEntry re(m_objectStore);
  ScopedSharedLock rel(re);
  re.fetch();
  rel.release();
  for (auto &tf:criteria.archiveFile.tapeFiles) {
    if (!vidsToConsider.count(tf.second.vid))
      continue;
    std::string rqAddr;
    try {
      std::string rqAddr = re.getRetrieveQueue(tf.second.vid);
    } catch (cta::exception::Exception &) {
      ret.push_back(RetrieveQueueStatistics());
      ret.back().vid=tf.second.vid;
      ret.back().bytesQueued=0;
      ret.back().currentPriority=0;
      ret.back().filesQueued=0;
699
      continue;
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
    }
    RetrieveQueue rq(rqAddr, m_objectStore);
    ScopedSharedLock rql(rq);
    rq.fetch();
    rql.release();
    if (rq.getVid() != tf.second.vid)
      throw cta::exception::Exception("In OStoreDB::getRetrieveQueueStatistics(): unexpected vid for retrieve queue");
    ret.push_back(RetrieveQueueStatistics());
    ret.back().vid=rq.getVid();
    ret.back().currentPriority=rq.getJobsSummary().priority;
    ret.back().bytesQueued=rq.getJobsSummary().bytes;
    ret.back().filesQueued=rq.getJobsSummary().files;
  }
  return ret;
}

716
717
718
//------------------------------------------------------------------------------
// OStoreDB::queueRetrieve()
//------------------------------------------------------------------------------
719
720
721
void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
  const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
  const std::string &vid) {
722
  assertAgentAddressSet();
723
724
725
726
727
728
  // Check that the requested retrieve job (for the provided vid) exists.
  if (!std::count_if(criteria.archiveFile.tapeFiles.cbegin(), 
                     criteria.archiveFile.tapeFiles.end(),
                     [vid](decltype(*criteria.archiveFile.tapeFiles.cbegin()) & tf){ return tf.second.vid == vid; }))
    throw RetrieveRequestHasNoCopies("In OStoreDB::queueRetrieve(): no tape file for requested vid.");
  // In order to post the job, construct it first in memory.
729
  objectstore::RetrieveRequest rReq(m_agentReference->nextId("RetrieveToFileRequest"), m_objectStore);
730
731
732
  rReq.initialize();
  rReq.setSchedulerRequest(rqst);
  rReq.setRetrieveFileQueueCriteria(criteria);
733
  // Point to the request in the agent
734
  m_agentReference->addToOwnership(rReq.getAddressIfSet(), m_objectStore);
735
736
737
738
739
  // Set an arbitrary copy number so we can serialize. Garbage collection we re-choose 
  // the tape file number and override it in case of problem (and we will set it further).
  rReq.setActiveCopyNumber(1);
  rReq.insert();
  ScopedExclusiveLock rrl(rReq);
740
741
  // Find the retrieve queue (or create it if necessary)
  RootEntry re(m_objectStore);
742
  ScopedExclusiveLock rel(re);
743
  re.fetch();
744
  auto rqAddr=re.addOrGetRetrieveQueueAndCommit(vid, *m_agentReference);
745
746
747
748
749
750
  // Create the request.
  rel.release();
  RetrieveQueue rq(rqAddr, m_objectStore);
  ScopedExclusiveLock rql(rq);
  rq.fetch();
  // We need to find the job corresponding to the vid
751
752
753
754
  for (auto & j: rReq.getArchiveFile().tapeFiles) {
    if (j.second.vid == vid) {
      rq.addJob(j.second.copyNb, rReq.getAddressIfSet(), criteria.archiveFile.fileSize, criteria.mountPolicy, rReq.getEntryLog().time);
      rReq.setActiveCopyNumber(j.second.copyNb);
755
756
757
758
759
760
761
762
763
764
765
766
767
768
      goto jobAdded;
    }
  }
  // Getting here means the expected job was not found. This is an internal error.
  throw cta::exception::Exception("In OStoreDB::queueRetrieve(): job not found for this vid");
  jobAdded:;
  // We can now commit the queue.
  rq.commit();
  rql.release();
  // Set the request ownership.
  rReq.setOwner(rqAddr);
  rReq.commit();
  rrl.release();
  // And relinquish ownership form agent
769
  m_agentReference->removeFromOwnership(rReq.getAddressIfSet(), m_objectStore);
770
771
}

772
773
774
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveRequestsByVid()
//------------------------------------------------------------------------------
775
776
std::list<RetrieveRequestDump> OStoreDB::getRetrieveRequestsByVid(const std::string& vid) const {
  throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
777
778
}

779
780
781
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveRequestsByRequester()
//------------------------------------------------------------------------------
782
783
784
785
std::list<RetrieveRequestDump> OStoreDB::getRetrieveRequestsByRequester(const std::string& vid) const {
  throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
}

786
787
788
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveRequests()
//------------------------------------------------------------------------------
789
std::map<std::string, std::list<RetrieveRequestDump> > OStoreDB::getRetrieveRequests() const {
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
  throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
//  std::map<cta::Tape, std::list<RetrieveRequestDump> > ret;
//  // Get list of tape pools and then tapes
//  objectstore::RootEntry re(m_objectStore);
//  objectstore::ScopedSharedLock rel(re);
//  re.fetch();
//  auto tpl=re.dumpTapePools();
//  rel.release();
//  for (auto tpp = tpl.begin(); tpp != tpl.end(); tpp++) {
//    // Get the list of tapes for the tape pool
//    objectstore::TapePool tp(tpp->address, m_objectStore);
//    objectstore::ScopedSharedLock tplock(tp);
//    tp.fetch();
//    auto tl = tp.dumpTapes();
//    for (auto tptr = tl.begin(); tptr!= tl.end(); tptr++) {
//      // Get the list of retrieve requests for the tape.
//      objectstore::Tape t(tptr->address, m_objectStore);
//      objectstore::ScopedSharedLock tlock(t);
//      t.fetch();
//      auto jobs = t.dumpAndFetchRetrieveRequests();
//      // If the list is not empty, add to the map.
//      if (jobs.size()) {
//        cta::Tape tkey;
//        // TODO tkey.capacityInBytes;
//        tkey.creationLog = t.getCreationLog();
//        // TODO tkey.dataOnTapeInBytes;
//        tkey.logicalLibraryName = t.getLogicalLibrary();
//        tkey.nbFiles = t.getLastFseq();
//        // TODO tkey.status
//        tkey.tapePoolName = tp.getName();
//        tkey.vid = t.getVid();
//        ret[tkey] = std::move(jobs);
//      }
//    }
//  }
//  return ret;
826
827
}

828
829
830
//------------------------------------------------------------------------------
// OStoreDB::deleteRetrieveRequest()
//------------------------------------------------------------------------------
831
void OStoreDB::deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& requester, 
832
833
  const std::string& remoteFile) {
  throw exception::Exception("Not Implemented");
834
}
835

836
837
838
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveJobs()
//------------------------------------------------------------------------------
839
840
841
842
std::list<cta::common::dataStructures::RetrieveJob> OStoreDB::getRetrieveJobs(const std::string& tapePoolName) const {
  throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
}

843
844
845
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveJobs()
//------------------------------------------------------------------------------
846
std::map<std::string, std::list<common::dataStructures::RetrieveJob> > OStoreDB::getRetrieveJobs() const {
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
  // We will walk all the tapes to get the jobs.
  std::map<std::string, std::list<common::dataStructures::RetrieveJob> > ret;
  RootEntry re(m_objectStore);
  ScopedSharedLock rel(re);
  re.fetch();
  auto rql=re.dumpRetrieveQueues();
  rel.release();
  for (auto & rqp: rql) {
    // This implementation gives imperfect consistency. Do we need better? (If so, TODO: improve).
    // This implementation is racy, so we tolerate that the queue is gone.
    try {
      RetrieveQueue rq(rqp.address, m_objectStore);
      ScopedSharedLock rql(rq);
      rq.fetch();
      for (auto & j: rq.dumpJobs()) {
        ret[rqp.vid].push_back(common::dataStructures::RetrieveJob());
        try {
          auto & jd=ret[rqp.vid].back();
          RetrieveRequest rr(j.address, m_objectStore);
          ScopedSharedLock rrl(rr);
          rr.fetch();
          jd.request=rr.getSchedulerRequest();
869
870
871
          for (auto & tf: rr.getArchiveFile().tapeFiles) {
            jd.tapeCopies[tf.second.vid].first=tf.second.copyNb;
            jd.tapeCopies[tf.second.vid].second=tf.second;
872
873
874
875
876
877
878
879
880
          }
        } catch (...) {
          ret[rqp.vid].pop_back();
        }
            
      }
    } catch (...) {}
  }
  return ret;
881
882
}

883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
//------------------------------------------------------------------------------
// OStoreDB::getDriveStates()
//------------------------------------------------------------------------------
std::list<cta::common::dataStructures::DriveState> OStoreDB::getDriveStates() const {
  RootEntry re(m_objectStore);
  ScopedSharedLock rel(re);
  re.fetch();
  auto driveRegisterAddress = re.getDriveRegisterAddress();
  objectstore::DriveRegister dr(driveRegisterAddress, m_objectStore);
  objectstore::ScopedExclusiveLock drl(dr);
  dr.fetch();
  return dr.getAllDrivesState();
}

//------------------------------------------------------------------------------
// OStoreDB::setDesiredDriveState()
//------------------------------------------------------------------------------
void OStoreDB::setDesiredDriveState(const std::string& drive, const common::dataStructures::DesiredDriveState & desiredState) {
  RootEntry re(m_objectStore);
  ScopedSharedLock rel(re);
  re.fetch();
  auto driveRegisterAddress = re.getDriveRegisterAddress();
  objectstore::DriveRegister dr(driveRegisterAddress, m_objectStore);
  objectstore::ScopedExclusiveLock drl(dr);
  dr.fetch();
  auto driveState = dr.getDriveState(drive);
  driveState.desiredDriveState = desiredState;
  dr.setDriveState(driveState);
  dr.commit();
}

//------------------------------------------------------------------------------
// OStoreDB::reportDriveStatus()
//------------------------------------------------------------------------------
void OStoreDB::reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo,
  cta::common::dataStructures::MountType mountType, common::dataStructures::DriveStatus status,
  time_t reportTime, uint64_t mountSessionId, uint64_t byteTransfered,
  uint64_t filesTransfered, double latestBandwidth, const std::string& vid, 
  const std::string& tapepool) {
  using common::dataStructures::DriveStatus;
  // Lock the drive register and try to find the drive entry
924
925
926
927
928
929
930
  RootEntry re(m_objectStore);
  ScopedSharedLock rel(re);
  re.fetch();
  auto driveRegisterAddress = re.getDriveRegisterAddress();
  objectstore::DriveRegister dr(driveRegisterAddress, m_objectStore);
  objectstore::ScopedExclusiveLock drl(dr);
  dr.fetch();
931
932
933
934
935
936
937
938
939
940
941
942
943
  // Wrap all the parameters together for easier manipulation by sub-functions
  ReportDriveStatusInputs inputs;
  inputs.mountType = mountType;
  inputs.byteTransfered = byteTransfered;
  inputs.filesTransfered = filesTransfered;
  inputs.latestBandwidth = latestBandwidth;
  inputs.mountSessionId = mountSessionId;
  inputs.reportTime = reportTime;
  inputs.status = status;
  inputs.vid = vid;
  inputs.tapepool = tapepool;
  updateDriveStatusInRegitry(dr, driveInfo, inputs);
  dr.commit();
944
945
}

946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
void OStoreDB::updateDriveStatusInRegitry(objectstore::DriveRegister& dr, 
  const common::dataStructures::DriveInfo& driveInfo, const ReportDriveStatusInputs& inputs) {
  using common::dataStructures::DriveStatus;
  // The drive state might not be present, in which case we have to fill it up with default values.
  cta::common::dataStructures::DriveState driveState;
  try { 
    driveState = dr.getDriveState(driveInfo.driveName);
  } catch (cta::exception::Exception & ex) {
    // The drive is missing in the registry. We have to fill the state with default
    // values, for what that will not be covered later.
    driveState.driveName = driveInfo.driveName;
    // host will be reset anyway.
    // logical library will be reset anyway.
    driveState.mountType = common::dataStructures::MountType::NoMount;
    driveState.driveStatus = common::dataStructures::DriveStatus::Unknown;
    driveState.sessionId = 0;
    driveState.bytesTransferedInSession = 0;
    driveState.filesTransferedInSession = 0;
    driveState.latestBandwidth = 0;
    driveState.sessionStartTime = 0;
    driveState.mountStartTime = 0;
    driveState.transferStartTime = 0;
    driveState.unloadStartTime = 0;
    driveState.unmountStartTime = 0;
    driveState.drainingStartTime = 0;
    driveState.downOrUpStartTime = 0;
    driveState.cleanupStartTime = 0;
    driveState.lastUpdateTime = 0;
    driveState.startStartTime = 0;
    driveState.desiredDriveState.up = (inputs.status==DriveStatus::Down?false:true);
    driveState.desiredDriveState.forceDown = false;
    driveState.currentTapePool = "";
    driveState.currentVid = "";
  }
  // Set the parameters that we always set
  driveState.host = driveInfo.host;
  driveState.logicalLibrary = driveInfo.logicalLibrary;
  // Set the status
  switch (inputs.status) {
    case DriveStatus::Down:
      setDriveDown(driveState, inputs);
      break;
    case DriveStatus::Up:
      setDriveUpOrMaybeDown(driveState, inputs);
      break;
    case DriveStatus::Starting:
      setDriveStarting(driveState, inputs);
      break;
    case DriveStatus::Mounting:
      setDriveMounting(driveState, inputs);
      break;
    case DriveStatus::Transfering:
      setDriveTransfering(driveState, inputs);
      break;
    case DriveStatus::Unloading:
For faster browsing, not all history is shown. View entire blame