RecallTaskInjector.cpp 8.42 KB
Newer Older
1
#include "castor/tape/tapeserver/daemon/RecallTaskInjector.hpp"
2
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
David COME's avatar
David COME committed
3
#include "castor/log/LogContext.hpp"
4
5
6
#include "castor/tape/tapegateway/FilesToRecallList.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
7
8
9
10
#include "castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/client/ClientProxy.hpp"
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
11
#include "log.h"
12
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
13
14
#include <stdint.h>

David COME's avatar
David COME committed
15
16
17
using castor::log::LogContext;
using castor::log::Param;

18
namespace{
19
20
21
22
23
24
25
26
  /**
   *  function to set a NULL the owning FilesToMigrateList  of a FileToMigrateStruct
   *   Indeed, a clone of a structure will only do a shallow copy (sic).
   *   Otherwise at the second destruction the object will try to remove itself 
   *   from the owning list and then boom !
   * @param ptr a pointer to an object to change
   * @return the parameter ptr 
   */
27
28
29
30
31
  castor::tape::tapegateway::FileToRecallStruct* removeOwningList(castor::tape::tapegateway::FileToRecallStruct* ptr){
    ptr->setFilesToRecallList(NULL);
    return ptr;
  }
}
32

33
34
35
36
37
namespace castor{
namespace tape{
namespace tapeserver{
namespace daemon {
  
38
RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm, 
39
        TapeSingleThreadInterface<TapeReadTask> & tapeReader,
40
        DiskWriteThreadPool & diskWriter,
41
42
        client::ClientInterface& client,
        uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc) : 
43
44
        m_thread(*this),m_memManager(mm),
        m_tapeReader(tapeReader),m_diskWriter(diskWriter),
45
        m_client(client),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold)
46
{}
47
//------------------------------------------------------------------------------
48
49
50
51
52
//destructor
//------------------------------------------------------------------------------
RecallTaskInjector::~RecallTaskInjector(){
}
//------------------------------------------------------------------------------
53
54
//finish
//------------------------------------------------------------------------------
55

56
void RecallTaskInjector::finish(){
57
58
59
  castor::tape::threading::MutexLocker ml(&m_producerProtection);
  m_queue.push(Request());
}
60
61
62
//------------------------------------------------------------------------------
//requestInjection
//------------------------------------------------------------------------------
63
void RecallTaskInjector::requestInjection(bool lastCall) {
64
  //@TODO where shall we  acquire the lock ? There of just before the push ?
65
  castor::tape::threading::MutexLocker ml(&m_producerProtection);
66
  m_queue.push(Request(m_maxFiles, m_maxBytes, lastCall));
67
}
68
69
70
//------------------------------------------------------------------------------
//waitThreads
//------------------------------------------------------------------------------
71
72
73
void RecallTaskInjector::waitThreads() {
  m_thread.wait();
}
74
75
76
//------------------------------------------------------------------------------
//startThreads
//------------------------------------------------------------------------------
77
78
79
void RecallTaskInjector::startThreads() {
  m_thread.start();
}
80
81
82
//------------------------------------------------------------------------------
//injectBulkRecalls
//------------------------------------------------------------------------------
83
84
85
86
void RecallTaskInjector::injectBulkRecalls(const std::vector<castor::tape::tapegateway::FileToRecallStruct*>& jobs) {
  for (std::vector<tapegateway::FileToRecallStruct*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {

    LogContext::ScopedParam sp[]={
87
      LogContext::ScopedParam(m_lc, Param("NSHOSTNAME", (*it)->nshost())),
88
      LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->fileid())),
89
90
91
      LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->fseq())),
      LogContext::ScopedParam(m_lc, Param("blockID", blockID(**it))),
      LogContext::ScopedParam(m_lc, Param("path", (*it)->path()))
92
    };
93
    tape::utils::suppresUnusedVariable(sp);
David COME's avatar
David COME committed
94
95
96
    
    m_lc.log(LOG_INFO, "Logged file to recall");
    
97
98
99
100
    DiskWriteTask * dwt = 
      new DiskWriteTask(
        removeOwningList(dynamic_cast<tape::tapegateway::FileToRecallStruct*>((*it)->clone())), 
        m_memManager);
101
102
    TapeReadTask * trt = 
      new TapeReadTask(
103
104
105
106
107
        removeOwningList(
          dynamic_cast<tape::tapegateway::FileToRecallStruct*>((*it)->clone())), 
          *dwt,
          m_memManager);
    
108
109
110
    m_diskWriter.push(dwt);
    m_tapeReader.push(trt);
  }
David COME's avatar
David COME committed
111
  LogContext::ScopedParam sp03(m_lc, Param("nbFile", jobs.size()));
112
  m_lc.log(LOG_INFO, "Tasks for recalling injected");
113
}
114
115
116
//------------------------------------------------------------------------------
//synchronousInjection
//------------------------------------------------------------------------------
117
bool RecallTaskInjector::synchronousInjection()
118
{
119
  client::ClientProxy::RequestReport reqReport;  
David COME's avatar
David COME committed
120

121
  std::auto_ptr<castor::tape::tapegateway::FilesToRecallList> 
122
123
124
125
126
127
128
129
130
131
132
133
    filesToRecallList;
  try {
    filesToRecallList.reset(m_client.getFilesToRecall(m_maxFiles,m_maxBytes,reqReport));
  } catch (castor::exception::Exception & ex) {
    castor::log::ScopedParamContainer scoped(m_lc);
    scoped.add("transactionId", reqReport.transactionId)
          .add("byteSizeThreshold",m_maxBytes)
          .add("maxFiles", m_maxFiles)
          .add("message", ex.getMessageValue());
    m_lc.log(LOG_ERR, "Failed to getFiledToRecall.");
    return false;
  }
134
135
  castor::log::ScopedParamContainer scoped(m_lc); 
  scoped.add("sendRecvDuration", reqReport.sendRecvDuration)
136
        .add("byteSizeThreshold",m_maxBytes)
137
138
        .add("transactionId", reqReport.transactionId)
        .add("maxFiles", m_maxFiles);
139
  if(NULL==filesToRecallList.get()) { 
140
    m_lc.log(LOG_ERR, "No files to recall: empty mount");
141
142
143
    return false;
  }
  else {
144
    std::vector<tapegateway::FileToRecallStruct*>& jobs= filesToRecallList->filesToRecall();
145
146
147
148
    injectBulkRecalls(jobs);
    return true;
  }
}
149
150
151
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
152
153
void RecallTaskInjector::WorkerThread::run()
{
David COME's avatar
David COME committed
154
  using castor::log::LogContext;
155
156
  _this.m_lc.pushOrReplace(Param("thread", "recallTaskInjector"));
  _this.m_lc.log(LOG_DEBUG, "Starting RecallTaskInjector thread");
David COME's avatar
David COME committed
157
  
158
159
160
161
162
  while (1) {
    Request req = _this.m_queue.pop();
    _this.m_lc.log(LOG_INFO,"RecallJobInjector:run: about to call client interface\n");
    client::ClientProxy::RequestReport reqReport;
    std::auto_ptr<tapegateway::FilesToRecallList> filesToRecallList(_this.m_client.getFilesToRecall(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
David COME's avatar
David COME committed
163
        
164
165
166
167
168
169
170
171
172
173
174
175
    LogContext::ScopedParam sp01(_this.m_lc, Param("transactionId", reqReport.transactionId));
    LogContext::ScopedParam sp02(_this.m_lc, Param("connectDuration", reqReport.connectDuration));
    LogContext::ScopedParam sp03(_this.m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
    
    if (NULL == filesToRecallList.get()) {
      if (req.lastCall) {
        _this.m_lc.log(LOG_INFO,"No more file to recall: triggering the end of session.\n");
        _this.m_tapeReader.finish();
        _this.m_diskWriter.finish();
        break;
      } else {
        _this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): got empty list, but not last call. NoOp.\n");
176
      }
177
178
179
180
181
182
    } else {
      std::vector<tapegateway::FileToRecallStruct*>& jobs= filesToRecallList->filesToRecall();
      _this.injectBulkRecalls(jobs);
    }
  } // end of while(1)
  //-------------
183
  _this.m_lc.log(LOG_DEBUG, "Finishing RecallTaskInjector thread");
184
185
186
187
188
  /* We want to finish at the first lastCall we encounter.
   * But even after sending finish() to m_diskWriter and to m_tapeReader,
   * m_diskWriter might still want some more task (the threshold could be crossed),
   * so we discard everything that might still be in the queue
   */
189
190
191
192
193
194
195
196
197
198
  if(_this.m_queue.size()>0) {
    bool stillReading =true;
    while(stillReading) {
      Request req = _this.m_queue.pop();
      if (req.end){
        stillReading = false;
      }
      LogContext::ScopedParam sp(_this.m_lc, Param("lastCall", req.lastCall));
      _this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request");
    }
199
  }
200
201
202
203
204
205
}

} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor