RecallReportPacker.cpp 8.78 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
25
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
26
#include "castor/log/Logger.hpp"
27
28
#include "log.h"
#include "serrno.h"
29

30
#include <signal.h>
31

32
namespace{
33
  struct failedReportRecallResult : public castor::exception::Exception{
34
35
36
37
    failedReportRecallResult(const std::string& s): Exception(s){}
  };
}

David COME's avatar
David COME committed
38
39
40
using castor::log::LogContext;
using castor::log::Param;

41
42
43
44
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
45
46
47
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
48
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, log::LogContext lc):
49
ReportPackerInterface<detail::Recall>(lc),
50
        m_workerThread(*this),m_errorHappened(false), m_retrieveMount(retrieveMount){
51
52

}
53
54
55
//------------------------------------------------------------------------------
//Destructor
//------------------------------------------------------------------------------
56
RecallReportPacker::~RecallReportPacker(){
57
  castor::server::MutexLocker ml(&m_producterProtection);
58
}
59
60
61
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
62
void RecallReportPacker::reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob,
63
  u_int32_t checksum, u_int64_t size){
64
  std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulRetrieveJob),checksum,size));
65
  castor::server::MutexLocker ml(&m_producterProtection);
66
67
  m_fifo.push(rep.release());
}
68
69
70
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------  
71
72
void RecallReportPacker::reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob,
  const castor::exception::Exception &ex){
73
  std::unique_ptr<Report> rep(new ReportError(std::move(failedRetrieveJob),ex));
74
  castor::server::MutexLocker ml(&m_producterProtection);
75
76
  m_fifo.push(rep.release());
}
77
78
79
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
80
void RecallReportPacker::reportEndOfSession(){
81
  castor::server::MutexLocker ml(&m_producterProtection);
82
83
84
  m_fifo.push(new ReportEndofSession());
}
  
85
86
87
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
88
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
89
  castor::server::MutexLocker ml(&m_producterProtection);
90
91
  m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
92

93
94
//------------------------------------------------------------------------------
//ReportSuccessful::execute
95
//------------------------------------------------------------------------------
96
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
97
  m_successfulRetrieveJob->complete(m_checksum, m_size);
98
}
99

100
101
102
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
103
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
104
    if(!parent.errorHappened()){
105
      parent.m_retrieveMount->complete();
Daniele Kruse's avatar
Daniele Kruse committed
106
      parent.m_lc.log(LOG_INFO,"Nominal RecallReportPacker::EndofSession has been reported");
107
      if (parent.m_watchdog) {
108
        parent.m_watchdog->addParameter(log::Param("status","success"));
109
110
111
112
        // We have a race condition here between the processing of this message by
        // the initial process and the printing of the end-of-session log, triggered
        // by the end our process. To delay the latter, we sleep half a second here.
        usleep(500*1000);
113
      }
114
115
116
    }
    else {
      const std::string& msg ="RecallReportPacker::EndofSession has been reported  but an error happened somewhere in the process";
117
      parent.m_lc.log(LOG_ERR,msg);
118
      parent.m_retrieveMount->complete();
119
120
      if (parent.m_watchdog) {
        parent.m_watchdog->addParameter(log::Param("status","failure"));
121
122
123
124
        // We have a race condition here between the processing of this message by
        // the initial process and the printing of the end-of-session log, triggered
        // by the end our process. To delay the latter, we sleep half a second here.
        usleep(500*1000);
125
      }
126
    }
127
}
128
129
130
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
131
132
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
  if(parent.m_errorHappened) {
133
    parent.m_retrieveMount->complete();
134
135
    LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
    parent.m_lc.log(LOG_ERR,m_message);
136
137
  }
  else{
138
   const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported  but NO error was detected during the process";
139
   parent.m_lc.log(LOG_ERR,msg);  
140
   parent.m_retrieveMount->complete();
141
  }
142
143
  if (parent.m_watchdog) {
    parent.m_watchdog->addParameter(log::Param("status","failure"));
144
145
146
147
    // We have a race condition here between the processing of this message by
    // the initial process and the printing of the end-of-session log, triggered
    // by the end our process. To delay the latter, we sleep half a second here.
    usleep(500*1000);
148
  }
149
}
150
151
152
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
153
154
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
  parent.m_errorHappened=true;
155
156
  parent.m_lc.log(LOG_ERR,m_ex.getMessageValue());
  m_failedRetrieveJob->failed(cta::exception::Exception(m_ex.getMessageValue()));
157
158
}
//------------------------------------------------------------------------------
159
160
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
161
162
163
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
164
165
166
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
167
void RecallReportPacker::WorkerThread::run(){
168
169
  m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
  m_parent.m_lc.log(LOG_DEBUG, "Starting RecallReportPacker thread");
170
  try{
171
172
173
174
175
176
177
    while(1) {    
      std::unique_ptr<Report> rep(m_parent.m_fifo.pop());    
      rep->execute(m_parent);

      if(rep->goingToEnd()) {
        break;
      }
178
    }
179
  }
180
  catch(const castor::exception::Exception& e){
181
182
    //we get there because to tried to close the connection and it failed
    //either from the catch a few lines above or directly from rep->execute
183
    m_parent.m_lc.log(LOG_ERR,"tried to report endOfSession(WithError) and got an exception, cant do much more");
184
185
186
187
    if (m_parent.m_watchdog) {
      m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
      m_parent.m_watchdog->addParameter(log::Param("status","failure"));
    }
188
  }
189
  m_parent.m_lc.log(LOG_DEBUG, "Finishing RecallReportPacker thread");
190
}
191
192
193
194
195
196
197
198

//------------------------------------------------------------------------------
//errorHappened()
//------------------------------------------------------------------------------
bool RecallReportPacker::errorHappened() {
  return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}

199
}}}}