Commit 44c37b8f authored by David COME's avatar David COME
Browse files

Added an end mechanism RecallTaskInjector

parent 82fb4939
......@@ -30,6 +30,11 @@ RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
m_client(client),m_lc(lc)
{}
void RecallTaskInjector::end(){
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request());
}
void RecallTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
//@TODO where shall we acquire the lock ? There of just before the push ?
castor::tape::threading::MutexLocker ml(&m_producerProtection);
......@@ -106,46 +111,44 @@ void RecallTaskInjector::WorkerThread::run()
{
using castor::log::LogContext;
while (1) {
Request req = _this.m_queue.pop();
_this.m_lc.log(LOG_INFO,"RecallJobInjector:run: about to call client interface\n");
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToRecallList> filesToRecallList(_this.m_client.getFilesToRecall(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
while (1) {
Request req = _this.m_queue.pop();
_this.m_lc.log(LOG_INFO,"RecallJobInjector:run: about to call client interface\n");
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToRecallList> filesToRecallList(_this.m_client.getFilesToRecall(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
LogContext::ScopedParam sp01(_this.m_lc, Param("transactionId", reqReport.transactionId));
LogContext::ScopedParam sp02(_this.m_lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp03(_this.m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
if (NULL == filesToRecallList.get()) {
if (req.lastCall) {
_this.m_lc.log(LOG_INFO,"No more file to recall: triggering the end of session.\n");
_this.m_tapeReader.finish();
_this.m_diskWriter.finish();
break;
} else {
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): got empty list, but not last call. NoOp.\n");
}
} else {
std::vector<tapegateway::FileToRecallStruct*>& jobs= filesToRecallList->filesToRecall();
_this.injectBulkRecalls(jobs);
}
} // end of while(1)
//-------------
/* We want to finish at the first lastCall we encounter.
* But even after sending finish() to m_diskWriter and to m_tapeReader,
m_diskWriter might still want some more task (the threshold could be crossed),
* so we discard everything that might still be in the queue
*/
try {
while(1) {
Request req = _this.m_queue.tryPop();
LogContext::ScopedParam sp(_this.m_lc, Param("lastCall", req.lastCall));
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request");
}
} catch (castor::tape::threading::noMore) {
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): Drained the request queue. We're now empty. Finishing");
LogContext::ScopedParam sp01(_this.m_lc, Param("transactionId", reqReport.transactionId));
LogContext::ScopedParam sp02(_this.m_lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp03(_this.m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
if (NULL == filesToRecallList.get()) {
if (req.lastCall) {
_this.m_lc.log(LOG_INFO,"No more file to recall: triggering the end of session.\n");
_this.m_tapeReader.finish();
_this.m_diskWriter.finish();
break;
} else {
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): got empty list, but not last call. NoOp.\n");
}
} else {
std::vector<tapegateway::FileToRecallStruct*>& jobs= filesToRecallList->filesToRecall();
_this.injectBulkRecalls(jobs);
}
} // end of while(1)
//-------------
/* We want to finish at the first lastCall we encounter.
* But even after sending finish() to m_diskWriter and to m_tapeReader,
* m_diskWriter might still want some more task (the threshold could be crossed),
* so we discard everything that might still be in the queue
*/
bool stillReading =true;
while(stillReading) {
Request req = _this.m_queue.pop();
stillReading = req.end;
LogContext::ScopedParam sp(_this.m_lc, Param("lastCall", req.lastCall));
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request");
}
}
} //end namespace daemon
......
......@@ -85,6 +85,8 @@ public:
* Start the inner thread
*/
void startThreads();
void end();
private:
/**
......@@ -103,8 +105,11 @@ private:
class Request {
public:
Request(int mf, int mb, bool lc):
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc) {}
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc),end(false) {}
Request():
nbMaxFiles(-1), byteSizeThreshold(-1), lastCall(true),end(true) {}
const int nbMaxFiles;
const int byteSizeThreshold;
......@@ -114,6 +119,8 @@ private:
* and can send into all the different threads a signal .
*/
const bool lastCall;
const bool end;
};
class WorkerThread: public castor::tape::threading::Thread {
......
......@@ -32,6 +32,7 @@ namespace daemon {
class TaskInjector{
public:
virtual void requestInjection(int maxFiles, int maxBlocks, bool lastCall) = 0;
virtual void end() = 0;
virtual ~TaskInjector() {}
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment