Helpers.cpp 26.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "Helpers.hpp"
#include "Backend.hpp"
#include "ArchiveQueue.hpp"
#include "AgentReference.hpp"
23
#include "RetrieveQueue.hpp"
24
#include "RootEntry.hpp"
25
26
#include "DriveRegister.hpp"
#include "DriveState.hpp"
27
28
29
#include "catalogue/Catalogue.hpp"
#include "common/exception/NonRetryableError.hpp"
#include <random>
30
31
32
33

namespace cta { namespace objectstore {

//------------------------------------------------------------------------------
34
// Helpers::getLockedAndFetchedArchiveQueue()
35
//------------------------------------------------------------------------------
36
37
template <>
void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
38
  ScopedExclusiveLock& archiveQueueLock, AgentReference & agentReference,
Eric Cano's avatar
Eric Cano committed
39
  const std::string& tapePool, QueueType queueType, log::LogContext & lc) {
40
41
42
43
44
45
  // TODO: if necessary, we could use a singleton caching object here to accelerate
  // lookups.
  // Getting a locked AQ is the name of the game.
  // Try and find an existing one first, create if needed
  Backend & be = archiveQueue.m_objectStore;
  for (size_t i=0; i<5; i++) {
46
    double rootFetchNoLockTime = 0;
47
48
    double rootRelockExclusiveTime = 0;
    double rootUnlockExclusiveTime = 0;
49
    double rootQueueDereferenceTime = 0;
50
51
    double rootRefetchTime = 0;
    double addOrGetQueueandCommitTime = 0;
52
53
    double queueLockTime = 0;
    double queueFetchTime = 0;
54
    utils::Timer t;
55
    {
56
57
58
      RootEntry re(be);
      re.fetchNoLock();
      rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
59
      try {
Eric Cano's avatar
Eric Cano committed
60
        archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool, queueType));
61
62
      } catch (cta::exception::Exception & ex) {
        ScopedExclusiveLock rexl(re);
63
        rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
64
        re.fetch();
65
        rootRefetchTime = t.secs(utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
66
        archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, agentReference, queueType, lc));
67
        addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
68
69
      }
    }
70
    if (rootRelockExclusiveTime)
71
      rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter);
72
73
    try {
      archiveQueueLock.lock(archiveQueue);
74
      queueLockTime = t.secs(utils::Timer::resetCounter);
75
      archiveQueue.fetch();
76
      queueFetchTime = t.secs(utils::Timer::resetCounter);
77
78
79
      log::ScopedParamContainer params(lc);
      params.add("attemptNb", i+1)
            .add("queueObject", archiveQueue.getAddressIfSet())
80
            .add("rootFetchNoLockTime", rootFetchNoLockTime)
81
82
83
84
85
86
87
            .add("rootRelockExclusiveTime", rootRelockExclusiveTime)
            .add("rootRefetchTime", rootRefetchTime)
            .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
            .add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
            .add("queueLockTime", queueLockTime)
            .add("queueFetchTime", queueFetchTime);
      lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): Successfully found and locked an archive queue.");
88
89
90
91
92
93
      return;
    } catch (cta::exception::Exception & ex) {
      // We have a (rare) opportunity for a race condition, where we identify the
      // queue and it gets deleted before we manage to lock it.
      // The locking of fetching will fail in this case.
      // We hence allow ourselves to retry a couple times.
94
95
      // We also need to make sure the lock on the queue is released (it is in
      // an object and hence not scoped).
96
97
98
99
100
101
102
103
104
105
106
107
108
109
      // We should also deal with the case where a queue was deleted but left 
      // referenced in the root entry. We will try to clean up if necessary.
      // Failing to do this, we will spin and exhaust all of our retries.
      // We will do this if this is not the first attempt (i.e. failing again
      // in a retry).
      if (i && typeid(ex) == typeid(cta::objectstore::Backend::NoSuchObject)) {
        // The queue has been proven to not exist. Let's make sure we de-reference
        // it form the root entry.
        RootEntry re(be);
        ScopedExclusiveLock rexl(re);
        rootRelockExclusiveTime += t.secs(utils::Timer::resetCounter);
        re.fetch();
        rootRefetchTime += t.secs(utils::Timer::resetCounter);
        try {
Eric Cano's avatar
Eric Cano committed
110
          re.removeArchiveQueueAndCommit(tapePool, queueType, lc);
111
112
113
114
115
116
117
118
119
     
          rootQueueDereferenceTime += t.secs(utils::Timer::resetCounter);
          log::ScopedParamContainer params(lc);
          params.add("tapepool", tapePool)
                .add("queueObject", archiveQueue.getAddressIfSet())
                .add("exceptionMsg", ex.getMessageValue());
          lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): removed reference to gone archive queue from root entry.");
        } catch (...) { /* Failing here is not fatal. We can get an exception if the queue was deleted in the meantime */ } 
      }
120
      if (archiveQueueLock.isLocked()) archiveQueueLock.release();
121
122
      log::ScopedParamContainer params(lc);
      params.add("attemptNb", i+1)
123
            .add("exceptionMessage", ex.getMessageValue())
124
            .add("queueObject", archiveQueue.getAddressIfSet())
125
            .add("rootFetchNoLockTime", rootFetchNoLockTime)
126
            .add("rootRefetchTime", rootRefetchTime)
127
            .add("rootQueueDereferenceTime", rootQueueDereferenceTime)
128
129
130
131
            .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
            .add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
            .add("queueLockTime", queueLockTime)
            .add("queueFetchTime", queueFetchTime);
132
      lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): failed to fetch an existing queue. Retrying.");
133
      archiveQueue.resetAddress();
134
      continue;
135
136
137
    } catch (...) {
      // Also release the lock if needed here.
      if (archiveQueueLock.isLocked()) archiveQueueLock.release();
138
      archiveQueue.resetAddress();
139
      throw;
140
141
    }
  }
142
143
  // Also release the lock if needed here.
  if (archiveQueueLock.isLocked()) archiveQueueLock.release();
144
  archiveQueue.resetAddress();
145
  throw cta::exception::Exception(std::string(
146
      "In OStoreDB::getLockedAndFetchedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ")
147
148
149
      + tapePool);
}

150

151
//------------------------------------------------------------------------------
152
153
// Helpers::getLockedAndFetchedRetrieveQueue()
//------------------------------------------------------------------------------
154
template <>
155
156
void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQueue,
  ScopedExclusiveLock& retrieveQueueLock, AgentReference& agentReference,
Eric Cano's avatar
Eric Cano committed
157
  const std::string& vid, QueueType queueType, log::LogContext & lc) {
158
159
160
161
162
163
  // TODO: if necessary, we could use a singleton caching object here to accelerate
  // lookups.
  // Getting a locked AQ is the name of the game.
  // Try and find an existing one first, create if needed
  Backend & be = retrieveQueue.m_objectStore;
  for (size_t i=0; i<5; i++) {
164
165
166
    double rootFetchNoLockTime = 0;
    double rootRelockExclusiveTime = 0;
    double rootUnlockExclusiveTime = 0;
167
    double rootQueueDereferenceTime = 0;
168
169
170
171
172
    double rootRefetchTime = 0;
    double addOrGetQueueandCommitTime = 0;
    double queueLockTime = 0;
    double queueFetchTime = 0;
    utils::Timer t;
173
174
    {
      RootEntry re (be);
175
176
      re.fetchNoLock();
      rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
177
      try {
Eric Cano's avatar
Eric Cano committed
178
        retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid, queueType));
179
180
      } catch (cta::exception::Exception & ex) {
        ScopedExclusiveLock rexl(re);
181
        rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
182
        re.fetch();
183
        rootRefetchTime = t.secs(utils::Timer::resetCounter);
Eric Cano's avatar
Eric Cano committed
184
        retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference, queueType, lc));
185
        addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
186
187
      }
    }
188
189
    if (rootRelockExclusiveTime)
      rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter);
190
191
    try {
      retrieveQueueLock.lock(retrieveQueue);
192
      queueLockTime = t.secs(utils::Timer::resetCounter);
193
      retrieveQueue.fetch();
194
195
196
197
198
199
200
201
202
203
204
205
      queueFetchTime = t.secs(utils::Timer::resetCounter);
      log::ScopedParamContainer params(lc);
      params.add("attemptNb", i+1)
            .add("queueObject", retrieveQueue.getAddressIfSet())
            .add("rootFetchNoLockTime", rootFetchNoLockTime)
            .add("rootRelockExclusiveTime", rootRelockExclusiveTime)
            .add("rootRefetchTime", rootRefetchTime)
            .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
            .add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
            .add("queueLockTime", queueLockTime)
            .add("queueFetchTime", queueFetchTime);
      lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): Successfully found and locked a retrieve queue.");
206
207
208
209
210
211
      return;
    } catch (cta::exception::Exception & ex) {
      // We have a (rare) opportunity for a race condition, where we identify the
      // queue and it gets deleted before we manage to lock it.
      // The locking of fetching will fail in this case.
      // We hence allow ourselves to retry a couple times.
212
213
      // We also need to make sure the lock on the queue is released (it is in
      // an object and hence not scoped).
214
215
216
217
218
219
220
221
222
223
224
225
226
227
      // We should also deal with the case where a queue was deleted but left 
      // referenced in the root entry. We will try to clean up if necessary.
      // Failing to do this, we will spin and exhaust all of our retries.
      // We will do this if this is not the first attempt (i.e. failing again
      // in a retry).
      if (i && typeid(ex) == typeid(cta::objectstore::Backend::NoSuchObject)) {
        // The queue has been proven to not exist. Let's make sure we de-reference
        // it form the root entry.
        RootEntry re(be);
        ScopedExclusiveLock rexl(re);
        rootRelockExclusiveTime += t.secs(utils::Timer::resetCounter);
        re.fetch();
        rootRefetchTime += t.secs(utils::Timer::resetCounter);
        try {
Eric Cano's avatar
Eric Cano committed
228
          re.removeRetrieveQueueAndCommit(vid, queueType, lc);
229
230
231
     
          rootQueueDereferenceTime += t.secs(utils::Timer::resetCounter);
          log::ScopedParamContainer params(lc);
232
          params.add("tapeVid", vid)
233
234
235
236
237
                .add("queueObject", retrieveQueue.getAddressIfSet())
                .add("exceptionMsg", ex.getMessageValue());
          lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): removed reference to gone retrieve queue from root entry.");
        } catch (...) { /* Failing here is not fatal. We can get an exception if the queue was deleted in the meantime */ } 
      }
238
239
240
241
242
243
244
      if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
      log::ScopedParamContainer params(lc);
      params.add("attemptNb", i+1)
            .add("exceptionMessage", ex.getMessageValue())
            .add("queueObject", retrieveQueue.getAddressIfSet())
            .add("rootFetchNoLockTime", rootFetchNoLockTime)
            .add("rootRefetchTime", rootRefetchTime)
245
            .add("rootQueueDereferenceTime", rootQueueDereferenceTime)
246
247
248
249
250
251
            .add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
            .add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
            .add("queueLockTime", queueLockTime)
            .add("queueFetchTime", queueFetchTime);
      lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): failed to fetch an existing queue. Retrying.");
      retrieveQueue.resetAddress();
252
      continue;
253
254
255
256
257
    } catch (...) {
      // Also release the lock if needed here.
      if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
      retrieveQueue.resetAddress();
      throw;
258
259
    }
  }
260
261
262
  // Also release the lock if needed here.
  if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
  retrieveQueue.resetAddress();
263
264
265
266
267
268
269
  throw cta::exception::Exception(std::string(
      "In OStoreDB::getLockedAndFetchedRetrieveQueue(): failed to find or create and lock archive queue after 5 retries for vid: ")
      + vid);
}

//------------------------------------------------------------------------------
// Helpers::selectBestRetrieveQueue()
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
//------------------------------------------------------------------------------
std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candidateVids, cta::catalogue::Catalogue & catalogue,
    objectstore::Backend & objectstore) {
  // We will build the retrieve stats of the non-disable candidate vids here
  std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats;
  // A promise we create so we can make users wait on it.
  // Take the global lock
  cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex);
  // Create a promise just in case
  // Find the vids to be fetched (if any).
  for (auto & v: candidateVids) {
    try {
      // Out of range or outdated will be updated the same way.
      // If an update is in progress, we wait on it, and get the result after.
      // We have to release the global lock while doing so.
      if (g_retrieveQueueStatistics.at(v).updating) {
        // Cache is updating, we wait on update.
        auto updateFuture = g_retrieveQueueStatistics.at(v).updateFuture;
        grqsmLock.unlock();
        updateFuture.wait();
        grqsmLock.lock();
        if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled) {
          candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
        }
      } else {
        // We have a cache hit, check it's not stale.
        if (g_retrieveQueueStatistics.at(v).updateTime + c_retrieveQueueCacheMaxAge > time(nullptr))
          throw std::out_of_range("");
        // We're lucky: cache hit (and not stale)
        if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled)
          candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
      }
    } catch (std::out_of_range) {
      // We need to update the entry in the cache (miss or stale, we handle the same way).
      // We just update one vid at a time as doing several in parallel would be quite
      // hairy lock-wise (but give a slight performance boost).
      g_retrieveQueueStatistics[v].updating = true;
      std::promise<void> updatePromise;
      g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future();
      // Give other threads a chance to access the cache for other vids.
      grqsmLock.unlock();
      // Get the informations (stages, so we don't access the global variable without the mutex.
      auto tapeStatus=catalogue.getTapesByVid({v});
      // Build a minimal service  retrieve file queue criteria to query queues.
      common::dataStructures::RetrieveFileQueueCriteria rfqc;
      rfqc.archiveFile.tapeFiles[1].vid=v;
      auto queuesStats=Helpers::getRetrieveQueueStatistics(rfqc, {v}, objectstore);
      // We now have the data we need. Update the cache.
      grqsmLock.lock();
      g_retrieveQueueStatistics[v].updating=false;
      g_retrieveQueueStatistics[v].updateFuture=std::shared_future<void>();
      // Check we got the expected vid (and size of stats).
      if (queuesStats.size()!=1) 
        throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats.");
      if (queuesStats.front().vid!=v)
        throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats.");
      if (tapeStatus.size()!=1)
        throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for tapeStatus.");
      if (tapeStatus.begin()->first!=v)
        throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in tapeStatus.");
      g_retrieveQueueStatistics[v].stats = queuesStats.front();
      g_retrieveQueueStatistics[v].tapeStatus = tapeStatus.at(v);
      // Signal to potential waiters
      updatePromise.set_value();
      // Update our own candidate list if needed.
      if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled)
        candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
    }
  }
  // We now have all the candidates listed (if any).
  if (candidateVidsStats.empty())
341
    throw NoTapeAvailableForRetrieve("In Helpers::selectBestRetrieveQueue(): no tape available to recall from.");
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
  // Sort the tapes.
  candidateVidsStats.sort(SchedulerDatabase::RetrieveQueueStatistics::leftGreaterThanRight);
  // Get a list of equivalent best tapes
  std::set<std::string> shortlistVids;
  for (auto & s: candidateVidsStats) {
    if (!(s<candidateVidsStats.front()) && !(s>candidateVidsStats.front()))
      shortlistVids.insert(s.vid);
  }
  // If there is only one best tape, we're done
  if (shortlistVids.size()==1) return *shortlistVids.begin();
  // There are several equivalent entries, choose randomly among them.
  // First element will always be selected.
  // We need to get a random number [0, candidateVids.size() -1]
  std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
  std::uniform_int_distribution<size_t> distribution(0, candidateVids.size() -1);
  size_t index=distribution(dre);
  auto it=candidateVids.cbegin();
  std::advance(it, index);
  return *it;
}

363
364
365
366
//------------------------------------------------------------------------------
// Helpers::updateRetrieveQueueStatisticsCache()
//------------------------------------------------------------------------------
void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_t files, uint64_t bytes, uint64_t priority) {
367
368
369
370
  // We will not update the status of the tape if we already cached it (caller did not check),
  // We will also not update the update time, to force an update after a while.
  // If we update the entry while another thread is updating it, this is harmless (cache users will
  // anyway wait, and just not profit from our update.
371
372
373
374
  threading::MutexLocker ml(g_retrieveQueueStatisticsMutex);
  try {
    g_retrieveQueueStatistics.at(vid).stats.filesQueued=files;
    g_retrieveQueueStatistics.at(vid).stats.bytesQueued=bytes;
375
    g_retrieveQueueStatistics.at(vid).stats.currentPriority = priority;
376
377
378
379
380
381
382
383
  } catch (std::out_of_range &) {
    // The entry is missing. We just create it.
    g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes;
    g_retrieveQueueStatistics[vid].stats.filesQueued=files;
    g_retrieveQueueStatistics[vid].stats.currentPriority=priority;
    g_retrieveQueueStatistics[vid].stats.vid=vid;
    g_retrieveQueueStatistics[vid].tapeStatus.disabled=false;
    g_retrieveQueueStatistics[vid].tapeStatus.full=false;
384
385
    g_retrieveQueueStatistics[vid].updating = false;
    g_retrieveQueueStatistics[vid].updateTime = time(nullptr);
386
387
388
  }
}

389
390
391
392
393
394
395
396
397
398
399
//------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatistics
//------------------------------------------------------------------------------
std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retrieveQueueStatistics;

//------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatisticsMutex
//------------------------------------------------------------------------------
cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex;

//------------------------------------------------------------------------------
400
// Helpers::getRetrieveQueueStatistics()
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
//------------------------------------------------------------------------------
std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueStatistics(
  const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider,
  objectstore::Backend & objectstore) {
  std::list<SchedulerDatabase::RetrieveQueueStatistics> ret;
  // Find the retrieve queues for each vid if they exist (absence is possible).
  RootEntry re(objectstore);
  ScopedSharedLock rel(re);
  re.fetch();
  rel.release();
  for (auto &tf:criteria.archiveFile.tapeFiles) {
    if (!vidsToConsider.count(tf.second.vid))
      continue;
    std::string rqAddr;
    try {
Eric Cano's avatar
Eric Cano committed
416
      std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid, QueueType::LiveJobs);
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
    } catch (cta::exception::Exception &) {
      ret.push_back(SchedulerDatabase::RetrieveQueueStatistics());
      ret.back().vid=tf.second.vid;
      ret.back().bytesQueued=0;
      ret.back().currentPriority=0;
      ret.back().filesQueued=0;
      continue;
    }
    RetrieveQueue rq(rqAddr, objectstore);
    ScopedSharedLock rql(rq);
    rq.fetch();
    rql.release();
    if (rq.getVid() != tf.second.vid)
      throw cta::exception::Exception("In OStoreDB::getRetrieveQueueStatistics(): unexpected vid for retrieve queue");
    ret.push_back(SchedulerDatabase::RetrieveQueueStatistics());
    ret.back().vid=rq.getVid();
    ret.back().currentPriority=rq.getJobsSummary().priority;
    ret.back().bytesQueued=rq.getJobsSummary().bytes;
    ret.back().filesQueued=rq.getJobsSummary().files;
  }
  return ret;
}

440
441
442
443
444
445
446
447
448
449
450
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedDriveState()
//------------------------------------------------------------------------------
void Helpers::getLockedAndFetchedDriveState(DriveState& driveState, ScopedExclusiveLock& driveStateLock, 
  AgentReference& agentReference, const std::string& driveName, log::LogContext& lc, CreateIfNeeded doCreate) {
  Backend & be = driveState.m_objectStore;
  // Try and get the location of the derive state lockfree (this should be most of the cases).
  try {
    RootEntry re(be);
    re.fetchNoLock();
    DriveRegister dr(re.getDriveRegisterAddress(), be);
451
    dr.fetchNoLock();
452
453
454
455
    driveState.setAddress(dr.getDriveAddress(driveName));
    driveStateLock.lock(driveState);
    driveState.fetch();
    if (driveState.getOwner() != dr.getAddressIfSet()) {
456
      std::string previouslySeenOwner=driveState.getOwner();
457
458
459
460
461
462
463
      // We have a special case: the drive state is not owned by the
      // drive register.
      // As we are lock free, we will re-lock in proper order.
      if (driveStateLock.isLocked()) driveStateLock.release();
      ScopedExclusiveLock drl(dr);
      dr.fetch();
      // Re-get the state (could have changed).
464
      driveState.resetAddress();
465
466
      driveState.setAddress(dr.getDriveAddress(driveName));
      driveStateLock.lock(driveState);
467
      driveState.fetch();
468
469
470
471
      // We have an exclusive lock on everything. We can now
      // safely switch the owner of the drive status to the drive register
      // (there is no other steady state ownership).
      // return all as we are done.
472
473
474
475
476
477
      log::ScopedParamContainer params (lc);
      params.add("driveRegisterObject", dr.getAddressIfSet())
            .add("driveStateObject", driveState.getAddressIfSet())
            .add("driveStateCurrentOwner", driveState.getOwner())
            .add("driveStatePreviouslySeenOwner", previouslySeenOwner);
      lc.log(log::WARNING, "In Helpers::getLockedAndFetchedDriveState(): unexpected owner for driveState (should be register, will fix it).");
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
      if (driveState.getOwner() != dr.getAddressIfSet()) {
        driveState.setOwner(dr.getAddressIfSet());
        driveState.commit();
      }
      // The drive register lock will be released automatically
    }
    // We're done with good day scenarios.
    return;
  } catch (...) {
    // If anything goes wrong, we will suppose we have to create the drive state and do every step,
    // one at time. Of course, this is far more costly (lock-wise).
    // ... except if we were not supposed to create it.
    if (doCreate == CreateIfNeeded::doNotCreate) {
      throw NoSuchDrive("In Helpers::getLockedAndFetchedDriveState(): no such drive. Will not create it as instructed.");
    }
    RootEntry re(be);
    re.fetchNoLock();
    DriveRegister dr(re.getDriveRegisterAddress(), be);
    ScopedExclusiveLock drl(dr);
    dr.fetch();
  checkDriveKnown:
    try {
      std::string dsAddress=dr.getDriveAddress(driveName);
      // The drive is known. Check it does exist.
      // We work in this order here because we are in one-off mode, so
      // efficiency is not problematic.
      if (be.exists(dsAddress)) {
        driveState.setAddress(dsAddress);
        driveStateLock.lock(driveState);
        driveState.fetch();
        if (driveState.getOwner() != dr.getAddressIfSet()) {
          driveState.setOwner(dr.getAddressIfSet());
          driveState.commit();
        }
      } else {
        dr.removeDrive(driveName);
        goto checkDriveKnown;
      }
    } catch (DriveRegister::NoSuchDrive &) {
      // OK, we do need to create the drive status.
      driveState.setAddress(agentReference.nextId(std::string ("DriveStatus-")+driveName));
      driveState.initialize(driveName);
      agentReference.addToOwnership(driveState.getAddressIfSet(), be);
521
      driveState.setOwner(agentReference.getAgentAddress());
522
523
524
525
526
      driveState.insert();
      dr.setDriveAddress(driveName, driveState.getAddressIfSet());
      dr.commit();
      driveStateLock.lock(driveState);
      driveState.fetch();
527
528
529
      driveState.setOwner(dr.getAddressIfSet());
      driveState.commit();
      agentReference.removeFromOwnership(driveState.getAddressIfSet(), be);
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
      return;
    }
  }
}


//------------------------------------------------------------------------------
// Helpers::getAllDriveStates()
//------------------------------------------------------------------------------
std::list<cta::common::dataStructures::DriveState> Helpers::getAllDriveStates(Backend& backend, log::LogContext &lc) {
  std::list<cta::common::dataStructures::DriveState> ret;
  // Get the register. Parallel get the states. Report... BUT if there are discrepancies, we 
  // will need to do some cleanup.
  RootEntry re(backend);
  re.fetchNoLock();
  DriveRegister dr(re.getDriveRegisterAddress(), backend);
  dr.fetchNoLock();
  std::list<DriveState> driveStates;
  std::list<std::unique_ptr<DriveState::AsyncLockfreeFetcher>> driveStateFetchers;
  for (auto & d: dr.getDriveAddresses()) {
    driveStates.emplace_back(DriveState(d.driveStateAddress, backend));
    driveStateFetchers.emplace_back(driveStates.back().asyncLockfreeFetch());
  }
  for (auto & df: driveStateFetchers) {
    df->wait();
  }
  for (auto &d: driveStates) {
    ret.emplace_back(d.getState());
  }
  return ret;
}

562

563
}} // namespace cta::objectstore.