TapeReadSingleThread.hpp 3.59 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/******************************************************************************
 *                      TapeReadSingleThread.hpp
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

27
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
28
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
29
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
30
#include "castor/tape/tapeserver/threading/Threading.hpp"
31
#include "castor/tape/tapeserver/drive/Drive.hpp"
32
#include "castor/tape/tapeserver/file/File.hpp"
33
34
35
#include <iostream>
#include <stdio.h>

36
37
38
39
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
40
class TapeReadSingleThread : public TapeSingleThreadInterface<TapeReadTask>{
41
public:
42
43
44
45
46
  TapeReadSingleThread(castor::tape::drives::DriveInterface & drive,
          const std::string vid, uint64_t maxFilesRequest,
          castor::log::LogContext & lc): 
   TapeSingleThreadInterface<TapeReadTask>(drive, vid, lc) {}
   void setTaskInjector(TaskInjector * ti) { m_taskInjector = ti; }
47

48
private:
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
  TapeReadTask * popAndRequestMoreJobs() {
    castor::tape::threading::BlockingQueue<TapeReadTask *>::valueRemainingPair 
      vrp = m_tasks.popGetSize();
    // If we just passed (down) the half full limit, ask for more
    // (the remaining value is after pop)
    if(vrp.remaining + 1 == m_maxFilesRequest/2) {
      // This is not a last call
      m_taskInjector->requestInjection(m_maxFilesRequest, 1000, false);
    } else if (0 == vrp.remaining) {
      // This is a last call: if the task injector comes up empty on this
      // one, he'll call it the end.
      m_taskInjector->requestInjection(m_maxFilesRequest, 1000, false);
    }
    return vrp.value;
  }
  
  
  
67
  virtual void run() {
68
    // First we have to initialise the tape read session
69
    m_logContext.pushOrReplace(log::Param("thread", "tapeRead"));
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    std::auto_ptr<castor::tape::tapeFile::ReadSession> rs;
    try {
      rs.reset(new castor::tape::tapeFile::ReadSession(m_drive, m_vid));
      m_logContext.log(LOG_INFO, "Tape read session session successfully started");
    } catch (castor::exception::Exception & ex) {
      m_logContext.log(LOG_ERR, "Failed to start tape read session");
      // TODO: log and unroll the session
      // TODO: add an unroll mode to the tape read task. (Similar to exec, but pushing blocks marked in error)
    }
    // Then we will loop on the tasks as they get from 
    // the task injector
    while(1) {
      TapeReadTask * task = popAndRequestMoreJobs();
      bool end = task->endOfWork();
      if (!end) task->execute(*rs, m_logContext);
      delete task;
      m_filesProcessed++;
      if (end) {
        return;
89
      }
90
91
92
93
    }
  }
  
  uint64_t m_maxFilesRequest;
94
};
95
96
97
98
99
}
}
}
}