RecallTaskInjector.cpp 6.26 KB
Newer Older
1
#include "castor/tape/tapeserver/daemon/RecallTaskInjector.hpp"
2
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
David COME's avatar
David COME committed
3
#include "castor/log/LogContext.hpp"
4
5
6
#include "castor/tape/tapegateway/FilesToRecallList.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
7
#include "log.h"
8
9
#include <stdint.h>

David COME's avatar
David COME committed
10
11
12
using castor::log::LogContext;
using castor::log::Param;

13
14
15
16
17
18
namespace{
  castor::tape::tapegateway::FileToRecallStruct* removeOwningList(castor::tape::tapegateway::FileToRecallStruct* ptr){
    ptr->setFilesToRecallList(NULL);
    return ptr;
  }
}
19

20
21
22
23
24
namespace castor{
namespace tape{
namespace tapeserver{
namespace daemon {
  
25
RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm, 
26
        TapeSingleThreadInterface<TapeReadTask> & tapeReader,
David COME's avatar
David COME committed
27
        DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,
28
        client::ClientInterface& client,castor::log::LogContext lc) : 
29
30
        m_thread(*this),m_memManager(mm),
        m_tapeReader(tapeReader),m_diskWriter(diskWriter),
31
        m_client(client),m_lc(lc)
32
33
{}

David COME's avatar
David COME committed
34
void RecallTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
35
  //@TODO where shall we  acquire the lock ? There of just before the push ?
36
37
  castor::tape::threading::MutexLocker ml(&m_producerProtection);
  m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
38
39
40
41
42
43
44
45
46
47
}

void RecallTaskInjector::waitThreads() {
  m_thread.wait();
}

void RecallTaskInjector::startThreads() {
  m_thread.start();
}

48
49
50
51
void RecallTaskInjector::injectBulkRecalls(const std::vector<castor::tape::tapegateway::FileToRecallStruct*>& jobs) {
  for (std::vector<tapegateway::FileToRecallStruct*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {

    LogContext::ScopedParam sp[]={
52
      LogContext::ScopedParam(m_lc, Param("NSHOSTNAME", (*it)->nshost())),
53
      LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->fileid())),
54
55
56
      LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->fseq())),
      LogContext::ScopedParam(m_lc, Param("blockID", blockID(**it))),
      LogContext::ScopedParam(m_lc, Param("path", (*it)->path()))
57
    };
58
    tape::utils::suppresUnusedVariable(sp);
David COME's avatar
David COME committed
59
60
61
    
    m_lc.log(LOG_INFO, "Logged file to recall");
    
62
63
64
65
66
67
68
69
70
71
72
    DiskWriteTask * dwt = 
      new DiskWriteTask(
        removeOwningList(dynamic_cast<tape::tapegateway::FileToRecallStruct*>((*it)->clone())), 
        m_memManager);
    TapeReadFileTask * trt = 
      new TapeReadFileTask(
        removeOwningList(
          dynamic_cast<tape::tapegateway::FileToRecallStruct*>((*it)->clone())), 
          *dwt,
          m_memManager);
    
73
74
75
    m_diskWriter.push(dwt);
    m_tapeReader.push(trt);
  }
David COME's avatar
David COME committed
76
  LogContext::ScopedParam sp03(m_lc, Param("nbFile", jobs.size()));
77
  m_lc.log(LOG_INFO, "Tasks for recalling injected");
78
79
}

David COME's avatar
David COME committed
80
bool RecallTaskInjector::synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold)
81
{
82
  client::ClientProxy::RequestReport reqReport;  
David COME's avatar
David COME committed
83

84
85
86
87
88
89
90
91
  std::auto_ptr<castor::tape::tapegateway::FilesToRecallList> filesToRecallList(m_client.getFilesToRecall(maxFiles,byteSizeThreshold,reqReport));
  LogContext::ScopedParam sp[]={
    LogContext::ScopedParam(m_lc, Param("maxFiles", maxFiles)),
    LogContext::ScopedParam(m_lc, Param("byteSizeThreshold",byteSizeThreshold)),
    LogContext::ScopedParam(m_lc, Param("transactionId", reqReport.transactionId)),
    LogContext::ScopedParam(m_lc, Param("connectDuration", reqReport.connectDuration)),
    LogContext::ScopedParam(m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration))
  };
92
  tape::utils::suppresUnusedVariable(sp);
93
94
  
  if(NULL==filesToRecallList.get()) { 
David COME's avatar
David COME committed
95
    m_lc.log(LOG_ERR, "Get called but no files to retrieve");
96
97
98
    return false;
  }
  else {
99
    std::vector<tapegateway::FileToRecallStruct*>& jobs= filesToRecallList->filesToRecall();
100
101
102
103
    injectBulkRecalls(jobs);
    return true;
  }
}
David COME's avatar
David COME committed
104

105
106
107
//--------------------------------------
void RecallTaskInjector::WorkerThread::run()
{
David COME's avatar
David COME committed
108
  using castor::log::LogContext;
109
110
  _this.m_lc.pushOrReplace(Param("thread", "recallTaskInjector"));
  _this.m_lc.log(LOG_DEBUG, "Starting RecallTaskInjector thread");
David COME's avatar
David COME committed
111
  
112
113
      while (1) {
        Request req = _this.m_queue.pop();
114
	_this.m_lc.log(LOG_INFO,"RecallJobInjector:run: about to call client interface\n");
115
        client::ClientProxy::RequestReport reqReport;
116
        std::auto_ptr<tapegateway::FilesToRecallList> filesToRecallList(_this.m_client.getFilesToRecall(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
David COME's avatar
David COME committed
117
118
119
120
121
        
        LogContext::ScopedParam sp01(_this.m_lc, Param("transactionId", reqReport.transactionId));
        LogContext::ScopedParam sp02(_this.m_lc, Param("connectDuration", reqReport.connectDuration));
        LogContext::ScopedParam sp03(_this.m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
  
122
        if (NULL == filesToRecallList.get()) {
123
          if (req.lastCall) {
David COME's avatar
David COME committed
124
	    _this.m_lc.log(LOG_INFO,"No more file to recall: triggering the end of session.\n");
125
126
127
128
            _this.m_tapeReader.finish();
            _this.m_diskWriter.finish();
            break;
          } else {
129
	     _this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): got empty list, but not last call. NoOp.\n");
130
131
	  }
        } else {
132
          std::vector<tapegateway::FileToRecallStruct*>& jobs= filesToRecallList->filesToRecall();
133
134
135
136
          _this.injectBulkRecalls(jobs);
        }
      } // end of while(1)
      //-------------
137
138
139
140
141
142
  
     /* We want to finish at the first lastCall we encounter.
      * But even after sending finish() to m_diskWriter and to m_tapeReader,
        m_diskWriter might still want some more task (the threshold could be crossed),
      *  so we discard everything that might still be in the queue
      */
143
144
145
      try {
        while(1) {
          Request req = _this.m_queue.tryPop();
146
147
          LogContext::ScopedParam sp(_this.m_lc, Param("lastCall", req.lastCall));
          _this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request");
148
149
        }
      } catch (castor::tape::threading::noMore) {
150
        _this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): Drained the request queue. We're now empty. Finishing");
151
      }
152
  _this.m_lc.log(LOG_DEBUG, "Finishing RecallTaskInjector thread");
153
154
155
156
157
158
}

} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor