ArchiveMount.cpp 15.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#include <iostream>

21
#include "scheduler/ArchiveMount.hpp"
22
#include "common/make_unique.hpp"
23
#include "objectstore/Backend.hpp"
24

25
26
27
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
28
cta::ArchiveMount::ArchiveMount(catalogue::Catalogue & catalogue): m_catalogue(catalogue), m_sessionRunning(false){
29
30
}

31
32
33
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
34
35
cta::ArchiveMount::ArchiveMount(catalogue::Catalogue & catalogue,
  std::unique_ptr<SchedulerDatabase::ArchiveMount> dbMount): m_catalogue(catalogue), 
36
    m_sessionRunning(false) {
37
38
39
40
41
  m_dbMount.reset(
    dynamic_cast<SchedulerDatabase::ArchiveMount*>(dbMount.release()));
  if(!m_dbMount.get()) {
    throw WrongMountType(std::string(__FUNCTION__) +
      ": could not cast mount to SchedulerDatabase::ArchiveMount");
42
43
44
  }
}

45
46
47
//------------------------------------------------------------------------------
// getMountType
//------------------------------------------------------------------------------
48
cta::common::dataStructures::MountType cta::ArchiveMount::getMountType() const {
49
  return m_dbMount->mountInfo.mountType;
50
}
51

Steven Murray's avatar
Steven Murray committed
52
53
54
//------------------------------------------------------------------------------
// getVid
//------------------------------------------------------------------------------
55
std::string cta::ArchiveMount::getVid() const {
56
  return m_dbMount->mountInfo.vid;
Steven Murray's avatar
Steven Murray committed
57
58
}

59
60
61
62
63
64
65
66
//------------------------------------------------------------------------------
// getDrive
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getDrive() const {
  return m_dbMount->mountInfo.drive;
}


67
68
69
//------------------------------------------------------------------------------
// getPoolName
//------------------------------------------------------------------------------
70
std::string cta::ArchiveMount::getPoolName() const {
71
  return m_dbMount->mountInfo.tapePool;
72
73
}

74
75
76
77
//------------------------------------------------------------------------------
// getVo
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getVo() const {
78
    return m_dbMount->mountInfo.vo;
79
80
81
}

//------------------------------------------------------------------------------
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// getMediaType
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getMediaType() const{
    return m_dbMount->mountInfo.mediaType;
}

//------------------------------------------------------------------------------
// getVendor
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getVendor() const{
    return m_dbMount->mountInfo.vendor;
}

//------------------------------------------------------------------------------
// getCapacityInBytes
97
//------------------------------------------------------------------------------
98
99
100
uint64_t cta::ArchiveMount::getCapacityInBytes() const
{
    return m_dbMount->mountInfo.capacityInBytes;
101
102
}

103
104
105
106
107
108
109
//------------------------------------------------------------------------------
// getNbFiles
//------------------------------------------------------------------------------
uint32_t cta::ArchiveMount::getNbFiles() const {
  return m_dbMount->nbFilesCurrentlyOnTape;
}

110
111
112
//------------------------------------------------------------------------------
// createDiskReporter
//------------------------------------------------------------------------------
113
cta::disk::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) {
114
  return m_reporterFactory.createDiskReporter(URL);
115
116
}

117
118
119
//------------------------------------------------------------------------------
// getMountTransactionId
//------------------------------------------------------------------------------
120
std::string cta::ArchiveMount::getMountTransactionId() const {
121
122
123
124
125
  std::stringstream id;
  if (!m_dbMount.get())
    throw exception::Exception("In cta::ArchiveMount::getMountTransactionId(): got NULL dbMount");
  id << m_dbMount->mountInfo.mountId;
  return id.str();
126
}
127

128
//------------------------------------------------------------------------------
129
130
// updateCatalogueWithTapeFilesWritten
//------------------------------------------------------------------------------
131
void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeItemWrittenPointer> &tapeFilesWritten) {
132
133
134
  m_catalogue.filesWrittenToTape(tapeFilesWritten);
}

135
//------------------------------------------------------------------------------
136
137
138
139
140
141
142
143
144
145
146
147
148
// getNextJobBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested, 
  uint64_t bytesRequested, log::LogContext& logContext) {
  // Check we are still running the session
  if (!m_sessionRunning)
    throw SessionNotRunning("In ArchiveMount::getNextJobBatch(): trying to get job from complete/not started session");
  // try and get a new job from the DB side
  std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested, 
    bytesRequested, logContext));
  std::list<std::unique_ptr<ArchiveJob>> ret;
  // We prepare the response
  for (auto & sdaj: dbJobBatch) {
Eric Cano's avatar
Eric Cano committed
149
    ret.emplace_back(new ArchiveJob(this, m_catalogue,
150
151
152
153
154
155
      sdaj->archiveFile, sdaj->srcURL, sdaj->tapeFile));
    ret.back()->m_dbJob.reset(sdaj.release());
  }
  return ret;
}

156
157
158
//------------------------------------------------------------------------------
// reportJobsBatchWritten
//------------------------------------------------------------------------------
159
void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs,
160
    std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, std::queue<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>>& failedToReportArchiveJobs,cta::log::LogContext& logContext) {
161
  std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten;
162
  std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
163
  std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> validatedSuccessfulDBArchiveJobs;
164
  std::unique_ptr<cta::ArchiveJob> job;
165
  std::string failedValidationJobReportURL;
166
  try{
167
168
169
170
171
    uint64_t files=0;
    uint64_t bytes=0;
    double catalogueTime=0;
    double schedulerDbTime=0;
    double clientReportingTime=0;
172
173
174
175
    while(!successfulArchiveJobs.empty()) {
      // Get the next job to report and make sure we will not attempt to process it twice.
      job = std::move(successfulArchiveJobs.front());
      successfulArchiveJobs.pop();
176
      if (!job.get()) continue;
177
178
179
180
181
182
      cta::log::ScopedParamContainer params(logContext);
      params.add("tapeVid",job->tapeFile.vid)
            .add("mountType",cta::common::dataStructures::toString(job->m_mount->getMountType()))
            .add("fileId",job->archiveFile.archiveFileID)
            .add("type", "ReportSuccessful");
      logContext.log(cta::log::INFO, "In cta::ArchiveMount::reportJobsBatchTransferred(), archive job succesful.");
183
184
185
186
187
      try {
        tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release());
      } catch (const cta::exception::Exception &ex){
        //We put the not validated job into this list in order to insert the job
        //into the failedToReportArchiveJobs list in the exception catching block
188
        failedValidationJobReportURL = job->reportURL();
189
        validatedSuccessfulDBArchiveJobs.emplace_back(std::move(job->m_dbJob));
190
191
        throw ex;
      }
192
193
      files++;
      bytes+=job->archiveFile.fileSize;
194
      validatedSuccessfulArchiveJobs.emplace_back(std::move(job));      
195
      job.reset();
196
    }
197
198
199
200
201
202
    while (!skippedFiles.empty()) {
      auto tiwup = cta::make_unique<cta::catalogue::TapeItemWritten>();
      *tiwup = skippedFiles.front();
      skippedFiles.pop();
      tapeItemsWritten.emplace(tiwup.release());
    }
203
    utils::Timer t;
204
205
206
207
208
209
210
    
    // Now get the db mount to mark the jobs as successful.
    // Extract the db jobs from the scheduler jobs.
    for (auto &schJob: validatedSuccessfulArchiveJobs) {
      validatedSuccessfulDBArchiveJobs.emplace_back(std::move(schJob->m_dbJob));
    }
    validatedSuccessfulArchiveJobs.clear();
211

212
    updateCatalogueWithTapeFilesWritten(tapeItemsWritten);
213
    catalogueTime=t.secs(utils::Timer::resetCounter);
214
215
    {
      cta::log::ScopedParamContainer params(logContext);
216
      params.add("tapeFilesWritten", tapeItemsWritten.size())
217
218
219
            .add("files", files)
            .add("bytes", bytes)
            .add("catalogueTime", catalogueTime);
220
      logContext.log(cta::log::INFO, "Catalog updated for batch of jobs");   
221
    }
222
    
223
    // We can now pass  thevalidatedSuccessfulArchiveJobs list for the dbMount to process. We are done at that point.
224
    // Reporting to client will be queued if needed and done in another process.
225
    m_dbMount->setJobBatchTransferred(validatedSuccessfulDBArchiveJobs, logContext);
226
227
228
229
230
231
232
233
    schedulerDbTime=t.secs(utils::Timer::resetCounter);
    cta::log::ScopedParamContainer params(logContext);
    params.add("files", files)
          .add("bytes", bytes)
          .add("catalogueTime", catalogueTime)
          .add("schedulerDbTime", schedulerDbTime)
          .add("totalTime", catalogueTime  + schedulerDbTime + clientReportingTime);
    logContext.log(log::INFO, "In ArchiveMount::reportJobsBatchWritten(): recorded a batch of archive jobs in metadata.");
234
235
236
237
238
239
240
241
  } catch (const cta::objectstore::Backend::NoSuchObject& ex){
    cta::log::ScopedParamContainer params(logContext);
    params.add("exceptionMessageValue", ex.getMessageValue());
    if (job.get()) {
      params.add("fileId", job->archiveFile.archiveFileID)
            .add("diskInstance", job->archiveFile.diskInstance)
            .add("diskFileId", job->archiveFile.diskFileId)
            .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
242
            .add("reportURL", failedValidationJobReportURL);
243
244
245
    }
    const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): job does not exist in the objectstore.";
    logContext.log(cta::log::WARNING, msg_error);
Michael Davis's avatar
Michael Davis committed
246
  } catch(const cta::exception::Exception& e){
247
248
249
250
251
252
253
    cta::log::ScopedParamContainer params(logContext);
    params.add("exceptionMessageValue", e.getMessageValue());
    if (job.get()) {
      params.add("fileId", job->archiveFile.archiveFileID)
            .add("diskInstance", job->archiveFile.diskInstance)
            .add("diskFileId", job->archiveFile.diskFileId)
            .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
254
            .add("reportURL", failedValidationJobReportURL);
255
    }
256
    const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception";
257
    logContext.log(cta::log::ERR, msg_error);
258
259
260
261
262
263
264
    //If validatedSuccessfulArchiveJobs has still jobs in it, it means that
    //the validation job->validateAndGetTapeFileWritten() failed for one job and
    //threw an exception. We will then have to fail all the others.
    for(auto &ctaJob: validatedSuccessfulArchiveJobs){
      if(ctaJob.get())
        validatedSuccessfulDBArchiveJobs.emplace_back(std::move(ctaJob->m_dbJob));
    }
265
    for(auto &aj: validatedSuccessfulDBArchiveJobs){
266
267
      if(aj.get())
        failedToReportArchiveJobs.push(std::move(aj));
268
    }
269
    throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
Michael Davis's avatar
Michael Davis committed
270
  } catch(const std::exception& e){
271
272
273
274
275
276
277
278
    cta::log::ScopedParamContainer params(logContext);
    params.add("exceptionWhat", e.what());
    if (job.get()) {
      params.add("fileId", job->archiveFile.archiveFileID)
            .add("diskInstance", job->archiveFile.diskInstance)
            .add("diskFileId", job->archiveFile.diskFileId)
            .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
    }
279
    const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an standard exception";
280
    logContext.log(cta::log::ERR, msg_error);
281
    for(auto &aj: validatedSuccessfulDBArchiveJobs){
282
283
      if(aj.get())
        failedToReportArchiveJobs.push(std::move(aj));
284
    }
285
286
287
288
289
    throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
  }
}


290
//------------------------------------------------------------------------------
291
// complete
292
//------------------------------------------------------------------------------
293
void cta::ArchiveMount::complete() {
294
295
296
297
  // Just set the session as complete in the DB.
  m_dbMount->complete(time(NULL));
  // and record we are done with the mount
  m_sessionRunning = false;
298
}
299

300
301
302
303
304
305
306
//------------------------------------------------------------------------------
// abort
//------------------------------------------------------------------------------
void cta::ArchiveMount::abort() {
  complete();
}

307
308
309
310
311
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
cta::ArchiveMount::~ArchiveMount() throw() {
}
312
313
314
315

//------------------------------------------------------------------------------
// setDriveStatus()
//------------------------------------------------------------------------------
316
317
void cta::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, const cta::optional<std::string> & reason) {
  m_dbMount->setDriveStatus(status, time(NULL), reason);
318
}
319

320
321
322
323
324
325
326
//------------------------------------------------------------------------------
// setTapeSessionStats()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) {
  m_dbMount->setTapeSessionStats(stats);
}

327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
//------------------------------------------------------------------------------
// setTapeMounted()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setTapeMounted(cta::log::LogContext& logContext) const {
  utils::Timer t;    
  log::ScopedParamContainer spc(logContext);
  try {
    m_catalogue.tapeMountedForArchive(m_dbMount->getMountInfo().vid, m_dbMount->getMountInfo().drive);
    auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
    spc.add("catalogueTime", catalogueTime);
    logContext.log(log::INFO, "In ArchiveMount::setTapeMounted(): success.");
  } catch (cta::exception::Exception &ex) {
    auto catalogueTimeFailed = t.secs(cta::utils::Timer::resetCounter);
    spc.add("catalogueTime", catalogueTimeFailed);
    logContext.log(cta::log::WARNING,
      "Failed to update catalogue for the tape mounted for archive.");
  }    
}

346
347
348
349
350
351
//------------------------------------------------------------------------------
// setTapeFull()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setTapeFull() {
  m_catalogue.noSpaceLeftOnTape(m_dbMount->getMountInfo().vid);
}