Commit a7f048b1 authored by David COME's avatar David COME
Browse files

Simplify interface of synchronous injectors (Mig+Rec)

the bytes/files numbers are specified at construction time and thus no longer necessary at the call
parent a15e87b1
......@@ -78,10 +78,10 @@ namespace daemon {
log::LogContext::ScopedParam sp0(m_lc, log::Param("m_maxBytesReq", m_maxBytesReq));
if(0==vrp.remaining){
m_injector->requestInjection(m_maxFilesReq, m_maxBytesReq,true);
m_injector->requestInjection(true);
m_lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (with last call)");
}else if(vrp.remaining + 1 == m_maxFilesReq/2){
m_injector->requestInjection(m_maxFilesReq, m_maxBytesReq,false);
m_injector->requestInjection(false);
m_lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (without last call)");
}
return vrp.value;
......
......@@ -46,9 +46,10 @@ namespace daemon {
MigrationTaskInjector::MigrationTaskInjector(MigrationMemoryManager & mm,
DiskThreadPoolInterface<DiskReadTaskInterface> & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
castor::log::LogContext lc):
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc):
m_thread(*this),m_memManager(mm),m_tapeWriter(tapeWriter),
m_diskReader(diskReader),m_client(client),m_lc(lc)
m_diskReader(diskReader),m_client(client),m_lc(lc),
m_maxFiles(maxFiles), m_maxByte(byteSizeThreshold)
{
}
......@@ -101,19 +102,18 @@ namespace daemon {
m_thread.start();
}
void MigrationTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
void MigrationTaskInjector::requestInjection( bool lastCall) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
if(!m_errorFlag) {
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
m_queue.push(Request(m_maxFiles, m_maxByte, lastCall));
}
}
bool MigrationTaskInjector::synchronousInjection(uint64_t maxFiles,
uint64_t byteSizeThreshold) {
bool MigrationTaskInjector::synchronousInjection() {
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList>
filesToMigrateList(m_client.getFilesToMigrate(maxFiles,
byteSizeThreshold,reqReport));
filesToMigrateList(m_client.getFilesToMigrate(m_maxFiles,
m_maxByte,reqReport));
if(NULL == filesToMigrateList.get()) {
m_lc.log(LOG_ERR, "No files to migrate: empty mount");
return false;
......
......@@ -49,7 +49,7 @@ public:
MigrationTaskInjector(MigrationMemoryManager & mm,
DiskThreadPoolInterface<DiskReadTaskInterface> & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
castor::log::LogContext lc);
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc);
/**
......@@ -77,7 +77,7 @@ public:
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
void requestInjection(bool lastCall);
/**
* Contact the client to make sure there are really something to do
......@@ -87,7 +87,7 @@ public:
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold);
bool synchronousInjection();
/**
* Send an end token in the request queue. There should be no subsequent
......@@ -156,6 +156,12 @@ private:
castor::tape::threading::BlockingQueue<Request> m_queue;
castor::tape::threading::AtomicFlag m_errorFlag;
//maximal number of files requested. at once
const uint64_t m_maxFiles;
//maximal number of cumulated byte requested. at once
const uint64_t m_maxByte;
};
} //end namespace daemon
......
......@@ -145,13 +145,14 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
rrp,
lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy, lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,lc);
trst.setTaskInjector(&rti);
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
if (rti.synchronousInjection(m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
if (rti.synchronousInjection()) {
// We got something to recall. Time to start the machinery
trst.startThreads();
dwtp.startThreads();
......@@ -211,9 +212,10 @@ void castor::tape::tapeserver::daemon::MountSession::executeWrite(LogContext & l
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
lc);
MigrationTaskInjector mti(mm, drtp, twst, m_clientProxy, lc);
if (mti.synchronousInjection(m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles)) {
MigrationTaskInjector mti(mm, drtp, twst, m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,lc);
if (mti.synchronousInjection()) {
// We have something to do: start the session by starting all the
// threads.
mm.startThreads();
......
......@@ -25,10 +25,11 @@ namespace daemon {
RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTaskInterface> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,
client::ClientInterface& client,castor::log::LogContext lc) :
client::ClientInterface& client,
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc) :
m_thread(*this),m_memManager(mm),
m_tapeReader(tapeReader),m_diskWriter(diskWriter),
m_client(client),m_lc(lc)
m_client(client),m_lc(lc),m_maxFiles(maxFiles),m_byteSizeThreshold(byteSizeThreshold)
{}
void RecallTaskInjector::finish(){
......@@ -36,10 +37,10 @@ void RecallTaskInjector::finish(){
m_queue.push(Request());
}
void RecallTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
void RecallTaskInjector::requestInjection(bool lastCall) {
//@TODO where shall we acquire the lock ? There of just before the push ?
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
m_queue.push(Request(m_maxFiles, m_byteSizeThreshold, lastCall));
}
void RecallTaskInjector::waitThreads() {
......@@ -82,15 +83,15 @@ void RecallTaskInjector::injectBulkRecalls(const std::vector<castor::tape::tapeg
m_lc.log(LOG_INFO, "Tasks for recalling injected");
}
bool RecallTaskInjector::synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold)
bool RecallTaskInjector::synchronousInjection()
{
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<castor::tape::tapegateway::FilesToRecallList>
filesToRecallList(m_client.getFilesToRecall(maxFiles,byteSizeThreshold,reqReport));
filesToRecallList(m_client.getFilesToRecall(m_maxFiles,m_byteSizeThreshold,reqReport));
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("maxFiles", maxFiles)),
LogContext::ScopedParam(m_lc, Param("byteSizeThreshold",byteSizeThreshold)),
LogContext::ScopedParam(m_lc, Param("maxFiles", m_maxFiles)),
LogContext::ScopedParam(m_lc, Param("byteSizeThreshold",m_byteSizeThreshold)),
LogContext::ScopedParam(m_lc, Param("transactionId", reqReport.transactionId)),
LogContext::ScopedParam(m_lc, Param("connectDuration", reqReport.connectDuration)),
LogContext::ScopedParam(m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration))
......
......@@ -24,9 +24,9 @@
#pragma once
#include <list>
#include <stdint.h>
#include <list>
#include "castor/tape/tapeserver/daemon/MemManager.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
......@@ -51,7 +51,7 @@ public:
RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface<TapeReadTaskInterface> & tapeReader,
DiskThreadPoolInterface<DiskWriteTaskInterface> & diskWriter,client::ClientInterface& client,
castor::log::LogContext lc);
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc);
/**
......@@ -59,12 +59,11 @@ public:
* TapeReadSingleThread. When TapeReadSingleThread::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it is class to push a request
* in order to (try) fill up the queue.
* @param maxFiles files count requested.
* @param maxBlocks total bytes count at least requested
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
virtual void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
virtual void requestInjection(bool lastCall);
/**
* Send an end token in the request queue. There should be no subsequent
......@@ -80,7 +79,7 @@ public:
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold);
bool synchronousInjection();
/**
* Wait for the inner thread to finish
......@@ -108,14 +107,13 @@ private:
*/
class Request {
public:
Request(int mf, int mb, bool lc):
Request(uint64_t mf, uint64_t mb, bool lc):
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc),end(false) {}
Request():
nbMaxFiles(-1), byteSizeThreshold(-1), lastCall(true),end(true) {}
const int nbMaxFiles;
const int byteSizeThreshold;
nbMaxFiles(0), byteSizeThreshold(0), lastCall(true),end(true) {}
const uint64_t nbMaxFiles;
const uint64_t byteSizeThreshold;
/**
* True if it is the last call for the set of requests :it means
......@@ -149,6 +147,13 @@ private:
castor::tape::threading::Mutex m_producerProtection;
castor::tape::threading::BlockingQueue<Request> m_queue;
//maximal number of files requested. at once
const uint64_t m_maxFiles;
//maximal number of cumulated byte requested. at once
const uint64_t m_byteSizeThreshold;
};
} //end namespace daemon
......
......@@ -66,15 +66,15 @@ TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNominal) {
FakeDiskWriteThreadPool diskWrite;
FakeSingleTapeReadThread tapeRead(drive, "V12345", lc);
tapeserver::daemon::RecallReportPacker rrp(client,2,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,6,blockSize,lc);
ASSERT_EQ(true,rti.synchronousInjection(6,blockSize));
ASSERT_EQ(true,rti.synchronousInjection());
ASSERT_EQ(nbFile,diskWrite.m_tasks.size());
ASSERT_EQ(nbFile,tapeRead.m_tasks.size());
rti.startThreads();
rti.requestInjection(6,blockSize,false);
rti.requestInjection(6,blockSize,true);
rti.requestInjection(false);
rti.requestInjection(true);
rti.finish();
rti.waitThreads();
......@@ -110,9 +110,9 @@ TEST(castor_tape_tapeserver_daemon, RecallTaskInjectorNoFiles) {
FakeSingleTapeReadThread tapeRead(drive, "V12345", lc);
tapeserver::daemon::RecallReportPacker rrp(client,2,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,lc);
tapeserver::daemon::RecallTaskInjector rti(mm,tapeRead,diskWrite,client,6,blockSize,lc);
ASSERT_EQ(false,rti.synchronousInjection(6,blockSize));
ASSERT_EQ(false,rti.synchronousInjection());
ASSERT_EQ(0U,diskWrite.m_tasks.size());
ASSERT_EQ(0U,tapeRead.m_tasks.size());
}
......
......@@ -54,11 +54,11 @@ private:
// (the remaining value is after pop)
if(vrp.remaining + 1 == m_maxFilesRequest/2) {
// This is not a last call
m_taskInjector->requestInjection(m_maxFilesRequest, 1000, false);
m_taskInjector->requestInjection(false);
} else if (0 == vrp.remaining) {
// This is a last call: if the task injector comes up empty on this
// one, he'll call it the end.
m_taskInjector->requestInjection(m_maxFilesRequest, 1000, true);
m_taskInjector->requestInjection(true);
}
return vrp.value;
}
......
......@@ -31,7 +31,7 @@ namespace daemon {
class TaskInjector{
public:
virtual void requestInjection(int maxFiles, int maxBlocks, bool lastCall) = 0;
virtual void requestInjection(bool lastCall) = 0;
virtual void finish() = 0;
virtual ~TaskInjector() {}
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment