MemQueues.hpp 18.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once
21
#include "common/helgrind_annotator.hpp"
22
23
#include "objectstore/ArchiveRequest.hpp"
#include "objectstore/ArchiveQueue.hpp"
24
25
#include "objectstore/RetrieveRequest.hpp"
#include "objectstore/RetrieveQueue.hpp"
26
#include "common/log/LogContext.hpp"
27
28
#include "common/threading/Mutex.hpp"
#include "common/threading/MutexLocker.hpp"
29
#include "objectstore/Helpers.hpp"
30
31
#include <unistd.h>
#include <syscall.h>
32
33
34
35
36
37
38

#include <future>

namespace cta { 
// Forward declaration
class OStoreDB;
  namespace ostoredb {
Eric Cano's avatar
Eric Cano committed
39
40
41
42
43
44
/**
 * A container to which the ownership of the archive queue (and more important,
 * its lock) will be passed. This container will be passed as a shared pointer
 * to the caller of sharedAddToArchiveQueue, so they can delete their copy AFTER
 * updating the ownership of their requests.
 */
45
template <class Queue, class Request>
Eric Cano's avatar
Eric Cano committed
46
class SharedQueueLock {
47
48
  template <class, class>
  friend class MemQueue;
Eric Cano's avatar
Eric Cano committed
49
50
51
52
53
public:
  SharedQueueLock(log::LogContext & logContext): m_logContext(logContext) {}
  ~SharedQueueLock();
private:
  std::unique_ptr<objectstore::ScopedExclusiveLock> m_lock;
54
  std::unique_ptr<Queue> m_queue;
55
  std::string m_queueIndex;
56
  std::shared_ptr<std::promise<void>> m_promiseForSuccessor;
57
  log::LogContext m_logContext;
Eric Cano's avatar
Eric Cano committed
58
59
  utils::Timer m_timer;
};
60

61
62
template <class Queue, class Request>
SharedQueueLock<Queue, Request>::~SharedQueueLock() {
63
  double waitTime = m_timer.secs(utils::Timer::resetCounter);
64
65
66
67
68
69
70
  bool skipQueuesTrim=false;
  if (m_lock.get() && m_lock->isLocked()) {
    m_lock->release();
  } else {
    m_logContext.log(log::ERR, "In SharedQueueLock::~SharedQueueLock(): the lock was not present or not locked. Skipping unlock.");
    skipQueuesTrim=true;
  }
71
  double queueUnlockTime = m_timer.secs(utils::Timer::resetCounter);
72
  // The next update of the queue can now proceed
73
74
75
76
77
78
79
80
  if (m_promiseForSuccessor.get()) {
    ANNOTATE_HAPPENS_BEFORE(m_promiseForSuccessor.get());
    m_promiseForSuccessor->set_value();
  } else {
    m_logContext.log(log::ERR, "In SharedQueueLock::~SharedQueueLock(): the promise was not present. Skipping value setting.");
    skipQueuesTrim=true;
  }
  if (skipQueuesTrim) return;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
  double successorUnlockTime = m_timer.secs(utils::Timer::resetCounter);
  // We can now cleanup the promise/future couple if they were not picked up to trim the maps.
  // A next thread finding them unlocked or absent will be equivalent.
  threading::MutexLocker globalLock(MemQueue<Queue, Request>::g_mutex);
  // If there is an promise AND it is ours, we remove it.
  try {
    if (MemQueue<Queue, Request>::g_promises.at(m_queueIndex).get() == m_promiseForSuccessor.get()) {
      MemQueue<Queue, Request>::g_futures.erase(m_queueIndex);
      MemQueue<Queue, Request>::g_promises.erase(m_queueIndex);
    }
  } catch (std::out_of_range &) {}
  double inMemoryQueuesCleanupTime = m_timer.secs();
  log::ScopedParamContainer params(m_logContext);
  params.add("objectQueue", m_queue->getAddressIfSet())
        .add("waitTime", waitTime)
        .add("queueUnlockTime", queueUnlockTime)
        .add("successorUnlockTime", successorUnlockTime)
        .add("inMemoryQueuesCleanupTime", inMemoryQueuesCleanupTime);
  m_logContext.log(log::INFO, "In SharedQueueLock::~SharedQueueLock(): unlocked the archive queue pointer.");
100
}   
101
102

template <class Request, class Queue>
103
104
105
class MemQueueRequest {
  template <class, class>
  friend class MemQueue;
106
public:
107
108
109
  MemQueueRequest(typename Request::JobDump & job,
    Request & archiveRequest): m_job(job), m_request(archiveRequest), m_tid(::syscall(SYS_gettid)) {}
  virtual ~MemQueueRequest() {
110
    threading::MutexLocker ml(m_mutex);
111
  }
112
private:
113
114
  typename Request::JobDump & m_job;
  Request & m_request;
115
  std::promise<void> m_promise;
116
  std::shared_ptr<SharedQueueLock<Queue, Request>> m_returnValue;
117
  // Mutex protecting users against premature deletion
118
  threading::Mutex m_mutex;
119
120
  // Helper for debugging
  pid_t m_tid;
121
122
};

123
124
template <class Request, class Queue>
class MemQueue {
125
126
  template <class, class>
  friend class SharedQueueLock;
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
public:
  /**
   * This function adds ArchiveRequeuest to an ArchiveQueue in batch.
   * A static function that will either create the shared queue for a given
   * tape pool if none exist, of add the job to it otherwise. When adding
   * the job, the first calling thread will be woken up if enough jobs have been
   * accumulated.
   * The creation and action are done using the global lock, which should be
   * sufficiently fast as we work in memory.
   * All calls sharing the same batch will either succeed of throw the same 
   * exception.
   * The address of the archive queue object will be updated in both parameters
   * (job and archiveRequest).
   * 
   * @param job to be added to the ArchiveQueue (contains the tape pool name)
142
   * @param request the request itself.
143
144
145
   * @param oStoreDB reference to the object store, allowing creation of the queue
   * if needed
   * @param logContext log context to log addition of jobs to the queue.
146
   */
147
  static std::shared_ptr<SharedQueueLock<Queue, Request>> sharedAddToQueue(typename Request::JobDump & job,
148
    const std::string & queueIndex, Request & request, OStoreDB & oStoreDB, log::LogContext & logContext);
149
150
151
  
private:
  /** Mutex that should be locked before attempting any operation */
152
  threading::Mutex m_mutex;
153
  /** Add the object */
154
155
  void add(std::shared_ptr<MemQueueRequest<Request, Queue>>& request);
  std::list<std::shared_ptr<MemQueueRequest<Request, Queue>>> m_requests;
156
  static threading::Mutex g_mutex;
157
158
159
160
  /**
   * A set of per tape pool queues. Their presence indicates that a queue is currently being built up and new objects to be queued
   * should piggy back on it.
   */
161
  static std::map<std::string, std::shared_ptr<MemQueue>> g_queues;
162
163
164
165
166
167
168
169
170
171
172
173
174
175
  /**
   * A set of per tape pool promises. Their presence indicates that a previous queue is currently pending commit. Threads creating
   * a new queue (when there is no queue, yet a promise is present) will wait on them to prevent contention with previous queue update.
   * When a thread creates a queue (there was none) and finds no promise, it will create is own queue and promise, post the queue,
   * immediately push the content of the queue (which is likely, but not guaranteed to be just its own job), and release the promise
   * when done. 
   * At completion, if the initial promise is still present in the map, the creating thread will remove it, as no promise of a 
   * fulfilled promise are equivalent.
   */
  static std::map<std::string, std::shared_ptr<std::promise<void>>> g_promises;
  /**
   * A set of futures, extracted from the corresponding g_promises.
   */
  static std::map<std::string, std::future<void>> g_futures;
176
177
  
  /** Helper function for sharedAddToArchiveQueue */
178
  static std::shared_ptr<SharedQueueLock<Queue, Request>> sharedAddToNewQueue(typename Request::JobDump & job, const std::string & queueIndex,
179
    Request & request, OStoreDB & oStoreDB, log::LogContext & logContext, threading::MutexLocker &globalLock);
180
  
181
182
183
184
185
186
  /** Struct holding the job plus request data */
  struct JobAndRequest {
    typename Request::JobDump & job;
    Request & request;
  };
  
187
  /** Helper function handling the difference between archive and retrieve (vid vs tapepool) */
188
189
  static void specializedAddJobsToQueueAndCommit(std::list<JobAndRequest> & jobsToAdd, Queue & queue, 
    objectstore::AgentReference & agentReference, log::LogContext & logContext);
190
191
192
  
  /** Helper function updating the cached retrieve queue stats. Noop for archive queues */
  static void specializedUpdateCachedQueueStats(Queue &queue);
193
194
};

195
196
197
198
199
200
201
202
203
204
205
206
207
template <class Request, class Queue>
cta::threading::Mutex MemQueue<Request, Queue>::g_mutex;

template <class Request, class Queue>
std::map<std::string, std::shared_ptr<MemQueue<Request, Queue>>> MemQueue<Request, Queue>::g_queues;

template <class Request, class Queue>
std::map<std::string, std::shared_ptr<std::promise<void>>> MemQueue<Request, Queue>::g_promises;

template <class Request, class Queue>
std::map<std::string, std::future<void>> MemQueue<Request, Queue>::g_futures;

template <class Request, class Queue>
208
std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::sharedAddToQueue(typename Request::JobDump& job, 
209
    const std::string & queueIndex, Request& request, OStoreDB & oStoreDB, log::LogContext & logContext) {
210
211
212
213
214
  // 1) Take the global lock (implicit in the constructor)
  threading::MutexLocker globalLock(g_mutex);
  std::shared_ptr<MemQueue> q;
  try {
    // 2) Determine if the queue exists already or not
215
    q = g_queues.at(queueIndex);
216
217
  } catch (std::out_of_range &) {
    // The queue is not there. We will just create a new one.
218
    return sharedAddToNewQueue(job, queueIndex, request, oStoreDB, logContext, globalLock); 
219
220
221
222
  }
  // It does: we just ride the train: queue ourselves
  // Lock the queue.
  threading::MutexLocker ulq(q->m_mutex);
223
  std::shared_ptr<MemQueueRequest<Request, Queue>> maqr(new MemQueueRequest<Request, Queue>(job, request));
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
  // Extract the future before the other thread gets a chance to touch the promise.
  auto resultFuture = maqr->m_promise.get_future();
  q->add(maqr);
  // Release the queue, forget the queue, and release the global lock
  ulq.unlock();
  q.reset();
  globalLock.unlock();
  // Wait for our request completion (this could throw, if there was a problem)
  resultFuture.get();
  ANNOTATE_HAPPENS_AFTER(&maqr->m_promise);
  ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&maqr->m_promise);
  auto ret=maqr->m_returnValue;
  __attribute__((unused)) auto debugMaqr=maqr.get();
  maqr.reset();
  return ret;
}

template <class Request, class Queue>
242
std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::sharedAddToNewQueue(
243
  typename Request::JobDump& job, const std::string & queueIndex, Request& request, 
244
245
246
  OStoreDB& oStoreDB, log::LogContext& logContext, threading::MutexLocker &globalLock) {
  utils::Timer timer;
  // Re-check the queue is not there
247
  if (g_queues.end() != g_queues.find(queueIndex)) {
248
    throw cta::exception::Exception("In MemQueue::sharedAddToNewQueue(): the queue is present, while it should not!");
249
250
  }
  // Create the queue and reference it.
251
  auto maq = (g_queues[queueIndex] = std::make_shared<MemQueue>());
252
253
  // Get the promise from the previous future (if any). This will create the slot
  // as a side effect, but we will populate it right after.
254
  std::shared_ptr<std::promise<void>> promiseFromPredecessor = g_promises[queueIndex];
255
256
257
258
  // If the promise from predecessor was present, also extract the future.
  std::future<void> futureFromPredecessor;
  if (promiseFromPredecessor.get()) {
    try {
259
      futureFromPredecessor = std::move(g_futures.at(queueIndex));
260
    } catch (std::out_of_range &) {
261
      throw cta::exception::Exception("In MemQueue::sharedAddToNewQueue(): the future is not present, while it should!");
262
263
264
    }
  }
  // Create the promise and future for successor.
265
266
  std::shared_ptr<std::promise<void>> promiseForSuccessor = (g_promises[queueIndex] = std::make_shared<std::promise<void>>());
  g_futures[queueIndex] = promiseForSuccessor->get_future();
267
268
269
270
271
272
273
274
275
276
277
278
279
  // Release the global list
  globalLock.unlock();
  // Wait on out future, if necessary
  if (promiseFromPredecessor.get()) {
    futureFromPredecessor.get();
    ANNOTATE_HAPPENS_AFTER(promiseFromPredecessor.get());
    ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(promiseFromPredecessor.get());
  }
  // We are now clear to update the queue in object store.
  // Re-take the global and make sure the queue is not referenced anymore.
  globalLock.lock();
  // Remove the queue for our tape pool. It should be present.
  try {
280
    if (g_queues.at(queueIndex).get() != maq.get()) {
281
      throw cta::exception::Exception("In MemQueue::sharedAddToNewQueue(): the queue is not ours, while it should!");
282
283
    }
  } catch (std::out_of_range &) {
284
    throw cta::exception::Exception("In MemQueue::sharedAddToNewQueue(): the queue is not present, while it should!");
285
286
  }
  // Checks are fine, let's just drop the queue from the map
287
  g_queues.erase(queueIndex);
288
289
290
291
292
293
294
295
  // Lock the queue, to make sure the last user is done posting. We could do this after
  // releasing the global lock, but helgrind objects.
  threading::MutexLocker ulq(maq->m_mutex);
  // Our mem queue is now unreachable so we can let the global part go
  globalLock.unlock();
  double waitTime = timer.secs(utils::Timer::resetCounter);
  // We can now proceed with the queuing of the jobs in the object store.
  try {
296
    std::shared_ptr<SharedQueueLock<Queue, Request>> ret(new SharedQueueLock<Queue, Request>(logContext));
297
    ret->m_promiseForSuccessor=promiseForSuccessor;
298
    ret->m_queueIndex=queueIndex;
299
    ret->m_queue.reset(new Queue(oStoreDB.m_objectStore));
300
    ret->m_lock.reset(new objectstore::ScopedExclusiveLock);
301
    auto & queue = *ret->m_queue;
302
    auto & aql = *ret->m_lock;
Eric Cano's avatar
Eric Cano committed
303
304
    objectstore::Helpers::getLockedAndFetchedQueue<Queue>(queue, aql, 
            *oStoreDB.m_agentReference, queueIndex, objectstore::QueueType::LiveJobs, logContext);
305
    double getFetchedQueueTime = timer.secs(utils::Timer::resetCounter);
306
307
308
309
310
    size_t qJobsBefore=queue.dumpJobs().size();
    uint64_t qBytesBefore=0;
    for (auto j: queue.dumpJobs()) {
      qBytesBefore+=j.size;
    }
311
    size_t addedJobs=1;
312
313
    // Build the list of jobs to add to the queue
    std::list<JobAndRequest> jta;
314
    // First add the job for this thread
315
    jta.push_back({job, request});
316
317
318
319
320
    // We are done with the queue: release the lock to make helgrind happy.
    ulq.unlock();
    // We do the same for all the queued requests
    for (auto &maqr: maq->m_requests) {
      // Add the job
321
      jta.push_back({maqr->m_job, maqr->m_request});
322
323
      addedJobs++;
    }
324
    // Actually ass the jobs.
325
    specializedAddJobsToQueueAndCommit(jta, queue, *oStoreDB.m_agentReference, logContext);
326
    double queueProcessAndCommitTime = timer.secs(utils::Timer::resetCounter);
327
328
    // Update the cache stats in memory as we hold the queue.
    specializedUpdateCachedQueueStats(queue);
329
    double cacheUpdateTime = timer.secs(utils::Timer::resetCounter);
330
    // Log
331
332
333
334
335
    size_t qJobsAfter=queue.dumpJobs().size();
    uint64_t qBytesAfter=0;
    for (auto j: queue.dumpJobs()) {
      qBytesAfter+=j.size;
    }
336
337
    {
      log::ScopedParamContainer params(logContext);
338
339
340
341
342
      if (typeid(Queue) == typeid(objectstore::ArchiveQueue)) {
        params.add("type", "Archive")
              .add("tapepool", queueIndex);
      } else if (typeid(Queue) == typeid(objectstore::RetrieveQueue)) {
        params.add("type", "Retrieve")
343
              .add("tapeVid", queueIndex);
344
      }
345
      params.add("objectQueue", queue.getAddressIfSet())
346
347
348
349
            .add("jobsBefore", qJobsBefore)
            .add("jobsAfter", qJobsAfter)
            .add("bytesBefore", qBytesBefore)
            .add("bytesAfter", qBytesAfter)
350
351
            .add("addedJobs", addedJobs)
            .add("waitTime", waitTime)
352
            .add("getFetchedQueueTime", getFetchedQueueTime)
353
            .add("queueProcessAndCommitTime", queueProcessAndCommitTime)
354
            .add("cacheUpdateTime", cacheUpdateTime) 
355
            .add("totalEnqueueTime", getFetchedQueueTime + queueProcessAndCommitTime 
356
                                    + cacheUpdateTime + timer.secs());
357
      logContext.log(log::INFO, "In MemQueue::sharedAddToNewQueue(): added batch of jobs to the queue.");
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
    }
    // We will also count how much time we mutually wait for the other threads.
    ret->m_timer.reset();
    // And finally release all the user threads
    for (auto &maqr: maq->m_requests) {
      {
        threading::MutexLocker (maqr->m_mutex);
        ANNOTATE_HAPPENS_BEFORE(&maqr->m_promise);
        maqr->m_returnValue=ret;
        maqr->m_promise.set_value();
      }
    }
    // Done!
    return ret;
  } catch (...) {
    try {
      std::rethrow_exception(std::current_exception());
    } catch (cta::exception::Exception &ex) {
      log::ScopedParamContainer params(logContext);
      params.add("message", ex.getMessageValue());
378
      logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): got an exception writing. Will propagate to other threads.");
379
380
381
382
    } catch (std::exception & ex) {
      log::ScopedParamContainer params(logContext);
      params.add("exceptionWhat", ex.what());
      logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): got a standard exception writing. Will propagate to other threads.");
383
    } catch (...) {
384
      logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): got an unknown exception writing. Will propagate to other threads.");
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
    }
    size_t exceptionsNotPassed = 0;
    // Something went wrong. We should inform the other threads
    for (auto & maqr: maq->m_requests) {
      try {
        threading::MutexLocker (maqr->m_mutex);
        ANNOTATE_HAPPENS_BEFORE(&maqr->m_promise);
        maqr->m_promise.set_exception(std::current_exception());
      } catch (...) {
        exceptionsNotPassed++;
      }
    }
    // And we inform the caller in our thread too
    if (exceptionsNotPassed) {
      try {
        std::rethrow_exception(std::current_exception());
      } catch (std::exception & ex) {
        std::stringstream err;
403
        err << "In MemQueue::sharedAddToNewQueue(), in main thread, failed to notify "
404
405
406
407
408
            << exceptionsNotPassed << " other threads out of  " << maq->m_requests.size()
            << " : " << ex.what();
        log::ScopedParamContainer params(logContext);
        params.add("what", ex.what())
              .add("exceptionsNotPassed", exceptionsNotPassed);
409
        logContext.log(log::ERR, "In MemQueue::sharedAddToNewQueue(): Failed to propagate exceptions to other threads.");
410
411
412
413
414
415
416
417
418

        throw cta::exception::Exception(err.str());
      }
    } else
      throw;
  } 
}

template <class Request, class Queue>
419
void MemQueue<Request, Queue>::add(std::shared_ptr<MemQueueRequest<Request, Queue>>& request) {
420
421
422
  m_requests.emplace_back(request); 
}

423
424
425
typedef MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue> MemArchiveQueue;
typedef MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue> MemRetrieveQueue;

426
}} // namespace cta::ostoreDBUtils