MigrationReportPacker.hpp 3.11 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/******************************************************************************
 *                      MigrationReportPacker.hpp
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

#include "castor/tape/tapeserver/daemon/MockTapeGateway.hpp"
28
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
29
30
31
32
33
34
35
36
37
38
#include "castor/tape/tapeserver/daemon/MigrationJob.hpp"
#include <list>

class MigrationReportPacker {
public:
  MigrationReportPacker(MockTapeGateway & tg): m_workerThread(*this),
          m_tapeGateway(tg) {}
  void reportCompletedJob(MigrationJob mj) {
    Report rep;
    rep.migrationJob = mj;
39
    castor::tape::threading::MutexLocker ml(&m_producterProtection);
40
41
42
43
44
    m_fifo.push(rep);
  }
  void reportFlush() {
    Report rep;
    rep.flush = true;
45
    castor::tape::threading::MutexLocker ml(&m_producterProtection);
46
47
48
49
50
    m_fifo.push(rep);
  }
  void reportEndOfSession() {
    Report rep;
    rep.EoSession = true;
51
    castor::tape::threading::MutexLocker ml(&m_producterProtection);
52
53
54
55
    m_fifo.push(rep);
  }
  void startThreads() { m_workerThread.start(); }
  void waitThread() { m_workerThread.wait(); }
56
  virtual ~MigrationReportPacker() { castor::tape::threading::MutexLocker ml(&m_producterProtection); }
57
58
59
60
61
62
63
64
65
private:
  class Report {
  public:
    Report(): flush(false), EoSession(false), 
	    migrationJob(-1, -1, -1) {}
    bool flush;
    bool EoSession;
    MigrationJob migrationJob;
  };
66
  class WorkerThread: public castor::tape::threading::Thread {
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  public:
    WorkerThread(MigrationReportPacker& parent): m_parent(parent) {}
    void run() {
      while(1) {
	MigrationReportPacker::Report rep = m_parent.m_fifo.pop();
	if (rep.flush || rep.EoSession) {
	  m_parent.m_tapeGateway.reportMigratedFiles(m_parent.m_currentReport);
	  m_parent.m_currentReport.clear();
	  if (rep.EoSession) {
	    m_parent.m_tapeGateway.reportEndOfSession(false);
	    return;
	  }
	} else {
	  m_parent.m_currentReport.push_back(rep.migrationJob);
	}
      }
    }
    MigrationReportPacker & m_parent;
  } m_workerThread;
  friend class WorkerThread;
  MockTapeGateway & m_tapeGateway;
88
  castor::tape::threading::BlockingQueue<Report> m_fifo;
89
  std::list<MigrationJob> m_currentReport;
90
  castor::tape::threading::Mutex m_producterProtection;
91
};