RetrieveRequest.cpp 25.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "RetrieveRequest.hpp"
#include "GenericObject.hpp"
21
#include "EntryLogSerDeser.hpp"
22
#include "MountPolicySerDeser.hpp"
23
#include "DiskFileInfoSerDeser.hpp"
24
#include "ArchiveFileSerDeser.hpp"
25
#include "RetrieveQueue.hpp"
26
#include "objectstore/cta.pb.h"
27
#include "Helpers.hpp"
28
#include <google/protobuf/util/json_util.h>
29
#include <cmath>
30

31
32
namespace cta { namespace objectstore {

33
34
35
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
36
RetrieveRequest::RetrieveRequest(
37
  const std::string& address, Backend& os): 
38
  ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>(os, address) { }
39

40
41
42
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
43
RetrieveRequest::RetrieveRequest(GenericObject& go):
44
  ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>(go.objectStore()) {
45
46
47
48
49
50
  // Here we transplant the generic object into the new object
  go.transplantHeader(*this);
  // And interpret the header.
  getPayloadFromHeader();
}

51
//------------------------------------------------------------------------------
52
// RetrieveRequest::initialize()
53
//------------------------------------------------------------------------------
54
void RetrieveRequest::initialize() {
55
  // Setup underlying object
56
  ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>::initialize();
57
58
59
60
  // This object is good to go (to storage)
  m_payloadInterpreted = true;
}

61
//------------------------------------------------------------------------------
62
// RetrieveRequest::garbageCollect()
63
64
65
66
//------------------------------------------------------------------------------
void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc,
    cta::catalogue::Catalogue & catalogue) {
  checkPayloadWritable();
67
  utils::Timer t;
68
  // Check the request is indeed owned by the right owner.
69
70
71
72
73
74
75
  if (getOwner() != presumedOwner) {
    log::ScopedParamContainer params(lc);
    params.add("jobObject", getAddressIfSet())
          .add("presumedOwner", presumedOwner)
          .add("owner", getOwner());
    lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): no garbage collection needed.");
  }
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
  // The owner is indeed the right one. We should requeue the request if possible.
  // Find the vids for active jobs in the request (pending ones).
  using serializers::RetrieveJobStatus;
  std::set<RetrieveJobStatus> validStates({RetrieveJobStatus::RJS_Pending, RetrieveJobStatus::RJS_Selected});
  std::set<std::string> candidateVids;
  for (auto &j: m_payload.jobs()) {
    if (validStates.count(j.status())) {
      // Find the job details in tape file
      for (auto &tf: m_payload.archivefile().tapefiles()) {
        if (tf.copynb() == j.copynb()) {
          candidateVids.insert(tf.vid());
          goto found;
        }
      }
      {
        std::stringstream err;
        err << "In RetrieveRequest::garbageCollect(): could not find tapefile for copynb " << j.copynb();
        throw exception::Exception(err.str());
      }
    found:;
    }
  }
98
99
100
101
102
103
104
105
106
  // If there is no candidate, we cancel the job
  // TODO: in the future, we might queue it for reporting to EOS.
  if (candidateVids.empty()) {
    remove();
    log::ScopedParamContainer params(lc);
    params.add("jobObject", getAddressIfSet());
    lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): deleted job as no tape file is available for recall.");
    return;
  }
107
108
  // If we have to fetch the status of the tapes and queued for the non-disabled vids.
  auto bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore);
109
110
111
112
113
114
115
116
117
118
119
120
121
  // Find the corresponding tape file, which will give the copynb, which will allow finding the retrieve job.
  auto bestTapeFile=m_payload.archivefile().tapefiles().begin();
  while (bestTapeFile != m_payload.archivefile().tapefiles().end()) {
    if (bestTapeFile->vid() == bestVid)
      goto tapeFileFound;
    bestTapeFile++;
  }
  {
    std::stringstream err;
    err << "In RetrieveRequest::garbageCollect(): could not find tapefile for vid " << bestVid;
    throw exception::Exception(err.str());
  }
tapeFileFound:;
122
  auto tapeSelectionTime = t.secs(utils::Timer::resetCounter);
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
  auto bestJob=m_payload.mutable_jobs()->begin();
  while (bestJob!=m_payload.mutable_jobs()->end()) {
    if (bestJob->copynb() == bestTapeFile->copynb())
      goto jobFound;
    bestJob++;
  }
  {
    std::stringstream err;
    err << "In RetrieveRequest::garbageCollect(): could not find job for copynb " << bestTapeFile->copynb();
    throw exception::Exception(err.str());
  }
jobFound:;
  // We now need to grab the queue a requeue the request.
  RetrieveQueue rq(m_objectStore);
  ScopedExclusiveLock rql;
138
  Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, lc);
139
140
141
  // Enqueue add the job to the queue
  objectstore::MountPolicySerDeser mp;
  mp.deserialize(m_payload.mountpolicy());
142
143
144
  std::list<RetrieveQueue::JobToAdd> jta;
  jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), 
    mp, (signed)m_payload.schedulerrequest().entrylog().time()});
145
  rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc);
146
  auto jobsSummary=rq.getJobsSummary();
147
  auto queueUpdateTime = t.secs(utils::Timer::resetCounter);
148
149
150
151
152
  // We can now make the transition official
  bestJob->set_status(serializers::RetrieveJobStatus::RJS_Pending);
  m_payload.set_activecopynb(bestJob->copynb());
  setOwner(rq.getAddressIfSet());
  commit();
153
154
155
  Helpers::updateRetrieveQueueStatisticsCache(bestVid, jobsSummary.files, jobsSummary.bytes, jobsSummary.priority);
  rql.release();
  auto commitUnlockQueueTime = t.secs(utils::Timer::resetCounter);
156
157
158
159
160
161
162
163
164
165
166
  {
    log::ScopedParamContainer params(lc);
    params.add("jobObject", getAddressIfSet())
          .add("queueObject", rq.getAddressIfSet())
          .add("copynb", bestTapeFile->copynb())
          .add("vid", bestTapeFile->vid())
          .add("tapeSelectionTime", tapeSelectionTime)
          .add("queueUpdateTime", queueUpdateTime)
          .add("commitUnlockQueueTime", commitUnlockQueueTime);
    lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): requeued the request.");
  }
167
168
169
170
171
172
173
174
175
176
177
178
  timespec ts;
  // We will sleep a bit to make sure other processes can also access the queue
  // as we are very likely to be part of a tight loop.
  // TODO: ideally, end of session requeueing and garbage collection should be
  // done in parallel.
  // We sleep half the time it took to queue to give a chance to other lockers.
  double secSleep, fracSecSleep;
  fracSecSleep = std::modf(queueUpdateTime / 2, &secSleep);
  ts.tv_sec = secSleep;
  ts.tv_nsec = std::round(fracSecSleep * 1000 * 1000 * 1000);
  nanosleep(&ts, nullptr);
  auto sleepTime = t.secs();
179
180
181
182
183
  {
    log::ScopedParamContainer params(lc);
    params.add("jobObject", getAddressIfSet())
          .add("queueObject", rq.getAddressIfSet())
          .add("copynb", bestTapeFile->copynb())
184
185
186
187
188
          .add("vid", bestTapeFile->vid())
          .add("tapeSelectionTime", tapeSelectionTime)
          .add("queueUpdateTime", queueUpdateTime)
          .add("commitUnlockQueueTime", commitUnlockQueueTime)
          .add("sleepTime", sleepTime);
189
    lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): slept some time to not sit on the queue after GC requeueing.");
190
  }
191
192
}

193
//------------------------------------------------------------------------------
194
// RetrieveRequest::addJob()
195
//------------------------------------------------------------------------------
196
void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) {
197
  checkPayloadWritable();
198
  auto *tf = m_payload.add_jobs();
199
  tf->set_copynb(copyNb);
200
  tf->set_lastmountwithfailure(0);
201
202
203
204
205
  tf->set_maxretrieswithinmount(maxRetiesWithinMount);
  tf->set_maxtotalretries(maxTotalRetries);
  tf->set_retrieswithinmount(0);
  tf->set_totalretries(0);
  tf->set_status(serializers::RetrieveJobStatus::RJS_Pending);
206
207
208
}

//------------------------------------------------------------------------------
209
// RetrieveRequest::setSchedulerRequest()
210
//------------------------------------------------------------------------------
211
void RetrieveRequest::setSchedulerRequest(const cta::common::dataStructures::RetrieveRequest& retrieveRequest) {
212
  checkPayloadWritable();
213
214
215
216
217
  auto *sr = m_payload.mutable_schedulerrequest();
  sr->mutable_requester()->set_name(retrieveRequest.requester.name);
  sr->mutable_requester()->set_group(retrieveRequest.requester.group);
  sr->set_archivefileid(retrieveRequest.archiveFileID);
  sr->set_dsturl(retrieveRequest.dstURL);
218
  sr->set_retrieveerrorreporturl(retrieveRequest.errorReportURL);
219
220
  DiskFileInfoSerDeser dfisd(retrieveRequest.diskFileInfo);
  dfisd.serialize(*sr->mutable_diskfileinfo());
221
  objectstore::EntryLogSerDeser el(retrieveRequest.creationLog);
222
  el.serialize(*sr->mutable_entrylog());
223
224
225
}

//------------------------------------------------------------------------------
226
// RetrieveRequest::getSchedulerRequest()
227
//------------------------------------------------------------------------------
228
cta::common::dataStructures::RetrieveRequest RetrieveRequest::getSchedulerRequest() {
229
  checkPayloadReadable();
230
231
232
233
  common::dataStructures::RetrieveRequest ret;
  ret.requester.name = m_payload.schedulerrequest().requester().name();
  ret.requester.group = m_payload.schedulerrequest().requester().group();
  ret.archiveFileID = m_payload.schedulerrequest().archivefileid();
234
  objectstore::EntryLogSerDeser el(ret.creationLog);
235
236
  el.deserialize(m_payload.schedulerrequest().entrylog());
  ret.dstURL = m_payload.schedulerrequest().dsturl();
237
  ret.errorReportURL = m_payload.schedulerrequest().retrieveerrorreporturl();
238
239
240
  objectstore::DiskFileInfoSerDeser dfisd;
  dfisd.deserialize(m_payload.schedulerrequest().diskfileinfo());
  ret.diskFileInfo = dfisd;
241
  return ret;
242
243
}

244
//------------------------------------------------------------------------------
245
// RetrieveRequest::getArchiveFile()
246
247
248
249
250
251
252
253
//------------------------------------------------------------------------------
cta::common::dataStructures::ArchiveFile RetrieveRequest::getArchiveFile() {
  objectstore::ArchiveFileSerDeser af;
  af.deserialize(m_payload.archivefile());
  return af;
}


254
//------------------------------------------------------------------------------
255
// RetrieveRequest::setRetrieveFileQueueCriteria()
256
//------------------------------------------------------------------------------
257
void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria) {
258
  checkPayloadWritable();
259
  ArchiveFileSerDeser(criteria.archiveFile).serialize(*m_payload.mutable_archivefile());
260
  for (auto &tf: criteria.archiveFile.tapeFiles) {
261
    MountPolicySerDeser(criteria.mountPolicy).serialize(*m_payload.mutable_mountpolicy());
262
263
    const uint32_t hardcodedRetriesWithinMount = 3;
    const uint32_t hardcodedTotalRetries = 6;
264
    addJob(tf.second.copyNb, hardcodedRetriesWithinMount, hardcodedTotalRetries);
265
  }
266
267
268
}

//------------------------------------------------------------------------------
269
// RetrieveRequest::dumpJobs()
270
//------------------------------------------------------------------------------
271
auto RetrieveRequest::dumpJobs() -> std::list<JobDump> {
272
  checkPayloadReadable();
273
274
275
  std::list<JobDump> ret;
  for (auto & j: m_payload.jobs()) {
    ret.push_back(JobDump());
276
    ret.back().copyNb=j.copynb();
277
    ret.back().status=j.status();
278
279
280
    // TODO: status
  }
  return ret;
281
282
283
}

//------------------------------------------------------------------------------
284
// RetrieveRequest::getJob()
285
//------------------------------------------------------------------------------
286
auto  RetrieveRequest::getJob(uint16_t copyNb) -> JobDump {
287
  checkPayloadReadable();
288
289
  // find the job
  for (auto & j: m_payload.jobs()) {
290
    if (j.copynb()==copyNb) {
291
      JobDump ret;
292
      ret.copyNb=copyNb;
293
      ret.status=j.status();
294
      return ret;
295
296
297
    }
  }
  throw NoSuchJob("In objectstore::RetrieveRequest::getJob(): job not found for this copyNb");
298
299
}

300
301
302
//------------------------------------------------------------------------------
// RetrieveRequest::getJobs()
//------------------------------------------------------------------------------
303
304
305
306
307
auto RetrieveRequest::getJobs() -> std::list<JobDump> {
  checkPayloadReadable();
  std::list<JobDump> ret;
  for (auto & j: m_payload.jobs()) {
    ret.push_back(JobDump());
308
    ret.back().copyNb=j.copynb();
309
    ret.back().status=j.status();
310
311
312
313
  }
  return ret;
}

314
315
316
//------------------------------------------------------------------------------
// RetrieveRequest::addJobFailure()
//------------------------------------------------------------------------------
317
318
bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, 
    const std::string & failureReason, log::LogContext & lc) {
319
320
321
322
  checkPayloadWritable();
  // Find the job and update the number of failures
  // (and return the full request status: failed (true) or to be retried (false))
  // The request will go through a full requeueing if retried (in caller).
323
324
  for (size_t i=0; i<(size_t)m_payload.jobs_size(); i++) {
    auto &j=*m_payload.mutable_jobs(i);
325
326
327
328
329
330
331
332
    if (j.copynb() == copyNumber) {
      if (j.lastmountwithfailure() == mountId) {
        j.set_retrieswithinmount(j.retrieswithinmount() + 1);
      } else {
        j.set_retrieswithinmount(1);
        j.set_lastmountwithfailure(mountId);
      }
      j.set_totalretries(j.totalretries() + 1);
333
      * j.mutable_failurelogs()->Add() = failureReason;
334
335
336
    }
    if (j.totalretries() >= j.maxtotalretries()) {
      j.set_status(serializers::RJS_Failed);
337
338
339
      bool ret=finishIfNecessary(lc);
      if (!ret) commit();
      return ret;
340
341
    } else {
      j.set_status(serializers::RJS_Pending);
342
      commit();
343
344
345
      return false;
    }
  }
Eric Cano's avatar
Eric Cano committed
346
  throw NoSuchJob ("In RetrieveRequest::addJobFailure(): could not find job");
347
348
}

349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
//------------------------------------------------------------------------------
// RetrieveRequest::getRetryStatus()
//------------------------------------------------------------------------------
RetrieveRequest::RetryStatus RetrieveRequest::getRetryStatus(const uint16_t copyNumber) {
  checkPayloadReadable();
  for (auto &j: m_payload.jobs()) {
    if (copyNumber == j.copynb()) {
      RetryStatus ret;
      ret.retriesWithinMount = j.retrieswithinmount();
      ret.maxRetriesWithinMount = j.maxretrieswithinmount();
      ret.totalRetries = j.totalretries();
      ret.maxTotalRetries = j.maxtotalretries();
      return ret;
    }
  }
  throw cta::exception::Exception("In RetrieveRequest::getRetryStatus(): job not found()");
}


//------------------------------------------------------------------------------
// RetrieveRequest::statusToString()
//------------------------------------------------------------------------------
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
std::string RetrieveRequest::statusToString(const serializers::RetrieveJobStatus& status) {
  switch(status) {
  case serializers::RetrieveJobStatus::RJS_Complete:
    return "Complete";
  case serializers::RetrieveJobStatus::RJS_Failed:
    return "Failed";
  case serializers::RetrieveJobStatus::RJS_LinkingToTape:
    return "LinkingToTape";
  case serializers::RetrieveJobStatus::RJS_Pending:
    return "Pending";
  case serializers::RetrieveJobStatus::RJS_Selected:
    return "Selected";
  default:
    return std::string("Unknown (")+std::to_string((uint64_t) status)+")";
  }
}


389
390
391
//------------------------------------------------------------------------------
// RetrieveRequest::finishIfNecessary()
//------------------------------------------------------------------------------
392
bool RetrieveRequest::finishIfNecessary(log::LogContext & lc) {
393
394
395
396
397
398
399
400
401
402
403
404
  checkPayloadWritable();
  // This function is typically called after changing the status of one job
  // in memory. If the request is complete, we will just remove it.
  // If all the jobs are either complete or failed, we can remove the request.
  auto & jl=m_payload.jobs();
  using serializers::RetrieveJobStatus;
  std::set<serializers::RetrieveJobStatus> finishedStatuses(
    {RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed});
  for (auto & j: jl)
    if (!finishedStatuses.count(j.status()))
      return false;
  remove();
405
406
  log::ScopedParamContainer params(lc);
  params.add("retrieveRequestObject", getAddressIfSet());
407
408
409
  for (auto & j: jl) {
    params.add(std::string("statusForCopyNb")+std::to_string(j.copynb()), statusToString(j.status()));
  }
410
  lc.log(log::INFO, "In RetrieveRequest::finishIfNecessary(): removed finished retrieve request.");
411
412
413
  return true;
}

414
415
416
//------------------------------------------------------------------------------
// RetrieveRequest::getJobStatus()
//------------------------------------------------------------------------------
417
418
419
420
421
422
423
424
serializers::RetrieveJobStatus RetrieveRequest::getJobStatus(uint16_t copyNumber) {
  checkPayloadReadable();
  for (auto & j: m_payload.jobs())
    if (j.copynb() == copyNumber)
      return j.status();
  std::stringstream err;
  err << "In RetrieveRequest::getJobStatus(): could not find job for copynb=" << copyNumber;
  throw exception::Exception(err.str());
425
426
}

427
428
429
//------------------------------------------------------------------------------
// RetrieveRequest::asyncUpdateOwner()
//------------------------------------------------------------------------------
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
auto RetrieveRequest::asyncUpdateOwner(uint16_t copyNumber, const std::string& owner, const std::string& previousOwner) 
  -> AsyncOwnerUpdater* {
  std::unique_ptr<AsyncOwnerUpdater> ret(new AsyncOwnerUpdater);
  // Passing a reference to the unique pointer led to strange behaviors.
  auto & retRef = *ret;
  ret->m_updaterCallback=
      [this, copyNumber, owner, previousOwner, &retRef](const std::string &in)->std::string {
        // We have a locked and fetched object, so we just need to work on its representation.
        serializers::ObjectHeader oh;
        if (!oh.ParseFromString(in)) {
          // Use a the tolerant parser to assess the situation.
          oh.ParsePartialFromString(in);
          throw cta::exception::Exception(std::string("In RetrieveRequest::asyncUpdateJobOwner(): could not parse header: ")+
            oh.InitializationErrorString());
        }
        if (oh.type() != serializers::ObjectType::RetrieveRequest_t) {
          std::stringstream err;
          err << "In RetrieveRequest::asyncUpdateJobOwner()::lambda(): wrong object type: " << oh.type();
          throw cta::exception::Exception(err.str());
        }
        // We don't need to deserialize the payload to update owner...
        if (oh.owner() != previousOwner)
          throw WrongPreviousOwner("In RetrieveRequest::asyncUpdateJobOwner()::lambda(): Request not owned.");
        oh.set_owner(owner);
        // ... but we still need to extract information
        serializers::RetrieveRequest payload;
        if (!payload.ParseFromString(oh.payload())) {
          // Use a the tolerant parser to assess the situation.
          payload.ParsePartialFromString(oh.payload());
          throw cta::exception::Exception(std::string("In RetrieveRequest::asyncUpdateJobOwner(): could not parse payload: ")+
            payload.InitializationErrorString());
        }
        // Find the copy number
        auto jl=payload.jobs();
        for (auto & j: jl) {
          if (j.copynb() == copyNumber) {
            // We also need to gather all the job content for the user to get in-memory
            // representation.
            // TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest.
            // We could try and refactor this.
            retRef.m_retieveRequest.archiveFileID = payload.archivefile().archivefileid();
            objectstore::EntryLogSerDeser el;
            el.deserialize(payload.schedulerrequest().entrylog());
            retRef.m_retieveRequest.creationLog = el;
            objectstore::DiskFileInfoSerDeser dfi;
            dfi.deserialize(payload.schedulerrequest().diskfileinfo());
            retRef.m_retieveRequest.diskFileInfo = dfi;
            retRef.m_retieveRequest.dstURL = payload.schedulerrequest().dsturl();
478
            retRef.m_retieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl();
479
480
            retRef.m_retieveRequest.requester.name = payload.schedulerrequest().requester().name();
            retRef.m_retieveRequest.requester.group = payload.schedulerrequest().requester().group();
481
482
483
            objectstore::ArchiveFileSerDeser af;
            af.deserialize(payload.archivefile());
            retRef.m_archiveFile = af;
484
485
486
487
488
            oh.set_payload(payload.SerializePartialAsString());
            return oh.SerializeAsString();
          }
        }
        // If we do not find the copy, return not owned as well...
Eric Cano's avatar
Eric Cano committed
489
        throw WrongPreviousOwner("In RetrieveRequest::asyncUpdateJobOwner()::lambda(): copyNb not found.");
490
491
492
493
494
      };
  ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(), ret->m_updaterCallback));
  return ret.release();
  }

495
496
497
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncOwnerUpdater::wait()
//------------------------------------------------------------------------------
498
499
500
501
void RetrieveRequest::AsyncOwnerUpdater::wait() {
  m_backendUpdater->wait();
}

502
503
504
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncOwnerUpdater::getArchiveFile()
//------------------------------------------------------------------------------
505
506
507
508
const common::dataStructures::ArchiveFile& RetrieveRequest::AsyncOwnerUpdater::getArchiveFile() {
  return m_archiveFile;
}

509
510
511
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncOwnerUpdater::getRetrieveRequest()
//------------------------------------------------------------------------------
512
513
514
515
const common::dataStructures::RetrieveRequest& RetrieveRequest::AsyncOwnerUpdater::getRetrieveRequest() {
  return m_retieveRequest;
}

516
517
518
//------------------------------------------------------------------------------
// RetrieveRequest::setActiveCopyNumber()
//------------------------------------------------------------------------------
519
520
521
522
523
void RetrieveRequest::setActiveCopyNumber(uint32_t activeCopyNb) {
  checkPayloadWritable();
  m_payload.set_activecopynb(activeCopyNb);
}

524
525
526
//------------------------------------------------------------------------------
// RetrieveRequest::getActiveCopyNumber()
//------------------------------------------------------------------------------
527
528
uint32_t RetrieveRequest::getActiveCopyNumber() {
  throw exception::Exception(std::string(__FUNCTION__) + " not implemented");
529
530
}

531
532
533
//------------------------------------------------------------------------------
// RetrieveRequest::getRetrieveFileQueueCriteria()
//------------------------------------------------------------------------------
534
cta::common::dataStructures::RetrieveFileQueueCriteria RetrieveRequest::getRetrieveFileQueueCriteria() {
535
536
537
538
539
540
541
542
543
  checkPayloadReadable();
  cta::common::dataStructures::RetrieveFileQueueCriteria ret;
  ArchiveFileSerDeser afsd;
  afsd.deserialize(m_payload.archivefile());
  ret.archiveFile = afsd;
  MountPolicySerDeser mpsd;
  mpsd.deserialize(m_payload.mountpolicy());
  ret.mountPolicy  = mpsd;
  return ret;
544
545
}

546
547
548
//------------------------------------------------------------------------------
// RetrieveRequest::getEntryLog()
//------------------------------------------------------------------------------
549
550
551
552
553
554
cta::common::dataStructures::EntryLog RetrieveRequest::getEntryLog() {
  checkPayloadReadable();
  EntryLogSerDeser el;
  el.deserialize(m_payload.schedulerrequest().entrylog());
  return el;
}
555

556
557
558
//------------------------------------------------------------------------------
// RetrieveRequest::dump()
//------------------------------------------------------------------------------
559
std::string RetrieveRequest::dump() {
560
  checkPayloadReadable();
561
562
563
564
565
566
  google::protobuf::util::JsonPrintOptions options;
  options.add_whitespace = true;
  options.always_print_primitive_fields = true;
  std::string headerDump;
  google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options);
  return headerDump;
567
568
}

569
570
571
//------------------------------------------------------------------------------
// RetrieveRequest::asyncDeleteJob()
//------------------------------------------------------------------------------
572
573
574
575
576
577
RetrieveRequest::AsyncJobDeleter * RetrieveRequest::asyncDeleteJob() {
  std::unique_ptr<AsyncJobDeleter> ret(new AsyncJobDeleter);
  ret->m_backendDeleter.reset(m_objectStore.asyncDelete(getAddressIfSet()));
  return ret.release();
}

578
579
580
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncJobDeleter::wait()
//------------------------------------------------------------------------------
581
void RetrieveRequest::AsyncJobDeleter::wait() {
582
  m_backendDeleter->wait();
583
584
}

585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
//------------------------------------------------------------------------------
// RetrieveRequest::getFailures()
//------------------------------------------------------------------------------
std::list<std::string> RetrieveRequest::getFailures() {
  checkPayloadReadable();
  std::list<std::string> ret;
  for (auto &j: m_payload.jobs()) {
    for (auto &f: j.failurelogs()) {
      ret.push_back(f);
    }
  }
  return ret;
}


600
601
}} // namespace cta::objectstore