Commit 14b3dd00 authored by David COME's avatar David COME
Browse files

(WIP) first draft of MigrationTaskInjector

parent 67234363
......@@ -23,6 +23,7 @@
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
#include "castor/tape/tapegateway/FilesToMigrateList.hpp"
namespace{
......@@ -32,6 +33,9 @@ namespace{
}
}
using castor::log::LogContext;
using castor::log::Param;
namespace castor{
namespace tape{
namespace tapeserver{
......@@ -43,7 +47,7 @@ namespace daemon {
TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,client::ClientInterface& client,
castor::log::LogContext lc):
m_thread(*this),m_memManager(mm),m_tapeWriter(tapeWriter),
m_diskReader(diskReader),m_client(client)
m_diskReader(diskReader),m_client(client),m_lc(lc)
{
}
......@@ -51,17 +55,29 @@ namespace daemon {
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs
* @param jobs to transform into tasks
*/
void MigrationTaskInjector::injectBulkMigrations(const std::vector<tapegateway::FileToMigrateStruct*>& jobs){
const u_signed64 blockCapacity = m_memManager->blockCapacity();
for(const std::vector<tapegateway::FileToMigrateStruct*>::const_iterator it= jobs.begin();it!=jobs.end();++it){
const u_signed64 blockCapacity = m_memManager.blockCapacity();
for(std::vector<tapegateway::FileToMigrateStruct*>::const_iterator it= jobs.begin();it!=jobs.end();++it){
const u_signed64 fileSize = (*it)->fileSize();
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->fileid())),
LogContext::ScopedParam(m_lc, Param("NSFILESEQNUMBER", (*it)->fseq())),
LogContext::ScopedParam(m_lc, Param("NSFILENSHOST", (*it)->nshost())),
LogContext::ScopedParam(m_lc, Param("NSFILEPATH", (*it)->path()))
};
tape::utils::suppresUnusedVariable(sp);
m_lc.log(LOG_INFO, "Logged file to migrate");
const u_signed64 neededBlock = fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1);
TapeWriteTask *twt = new TapeWriteTask(,neededBlock,mm);
DiskReadTask *drt = new DiskReadTask(*twt,removeOwningList((*it)->clone()));
TapeWriteTask *twt = new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager);
DiskReadTask *drt = new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock);
m_tapeWriter.push(twt);
m_diskReader.push(drt);
}
}
......@@ -78,11 +94,34 @@ namespace daemon {
void MigrationTaskInjector::startThreads(){
m_thread.start();
}
void MigrationTaskInjector::requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall) {
//@TODO where shall we acquire the lock ? There of just before the push ?
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
}
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
while(1){
Request req = _this.m_queue.pop();
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList> filesToMigrateList(_this.m_client.getFilesToMigrate(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
MigrationTaskInjector::WorkerThread::WorkerThread(MigrationTaskInjector & rji): _this(rji) {}
void MigrationTaskInjector::WorkerThread::run(){}
if(NULL==filesToMigrateList.get()){
if (req.lastCall) {
_this.m_lc.log(LOG_INFO,"No more file to migrate: triggering the end of session.\n");
_this.m_tapeWriter.finish();
_this.m_diskReader.finish();
break;
} else {
_this.m_lc.log(LOG_INFO,"In MigrationTaskInjector::WorkerThread::run(): got empty list, but not last call");
}
} else {
_this.injectBulkMigrations(filesToMigrateList->filesToMigrate());
}
}
}
} //end namespace daemon
......
......@@ -33,7 +33,7 @@
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/daemon/TaskInjector.hpp"
namespace castor{
namespace tape{
namespace tapeserver{
......@@ -66,8 +66,32 @@ public:
* Start the inner thread
*/
void startThreads();
void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
private:
/**
* A request of files to migrate. We request EITHER
* - a maximum of nbMaxFiles files
* - OR at least byteSizeThreshold bytes.
* That means we stop as soon as we have nbMaxFiles files or the cumulated size
* is equal or above byteSizeThreshold.
*/
class Request {
public:
Request(int mf, int mb, bool lc):
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc) {}
const int nbMaxFiles;
const int byteSizeThreshold;
/**
* True if it is the last call for the set of requests :it means
* we don't need to try to get more files to recall
* and can send into all the different threads a signal .
*/
const bool lastCall;
};
class WorkerThread: public castor::tape::threading::Thread {
public:
WorkerThread(MigrationTaskInjector & rji): _this(rji) {}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment