Commit fbc8927d authored by David COME's avatar David COME
Browse files

Merge branch 'tapeserver' of https://git.cern.ch/kerberos/CASTOR into tapeserver

Conflicts:
	castor/tape/tapeserver/daemon/MountSession.cpp
	castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp
parents 6f2f2ccc 24528285
......@@ -38,7 +38,8 @@ namespace tapeserver {
namespace daemon {
/**
* The memory manager is responsible for allocating memory blocks and distributing
* the free ones around to any class in need.
* the free ones around to any class in need. The distribution is actively run in
* a thread.
*/
class MigrationMemoryManager: private castor::tape::threading::Thread {
public:
......
......@@ -107,6 +107,27 @@ namespace daemon {
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
}
}
bool MigrationTaskInjector::synchronousInjection(uint64_t maxFiles,
uint64_t byteSizeThreshold) {
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList>
filesToMigrateList(m_client.getFilesToMigrate(maxFiles,
byteSizeThreshold,reqReport));
if(NULL == filesToMigrateList.get()) {
m_lc.log(LOG_ERR, "No files to migrate: empty mount");
return false;
} else {
std::vector<tapegateway::FileToMigrateStruct*>& jobs=filesToMigrateList->filesToMigrate();
injectBulkMigrations(jobs);
return true;
}
}
void MigrationTaskInjector::finish(){
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request());
}
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
......@@ -148,8 +169,21 @@ namespace daemon {
while(m_parent.m_queue.size()>0){
m_parent.m_queue.pop();
}
}
} // end of while(1)
//-------------
m_parent.m_lc.log(LOG_DEBUG, "Finishing MigrationTaskInjector thread");
/* We want to finish at the first lastCall we encounter.
* But even after sending finish() to m_diskWriter and to m_tapeReader,
* m_diskWriter might still want some more task (the threshold could be crossed),
* so we discard everything that might still be in the queue
*/
bool stillReading =true;
while(stillReading) {
Request req = m_parent.m_queue.pop();
if (req.end) stillReading = false;
LogContext::ScopedParam sp(m_parent.m_lc, Param("lastCall", req.lastCall));
m_parent.m_lc.log(LOG_INFO,"In MigrationTaskInjector::WorkerThread::run(): popping extra request");
}
}
......
......@@ -67,8 +67,33 @@ public:
* Start the inner thread
*/
void startThreads();
/**
* Function for a feed-back loop purpose between MigrationTaskInjector and
* DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it is class to push a request
* in order to (try) fill up the queue.
* @param maxFiles files count requested.
* @param maxBlocks total bytes count at least requested
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
/**
* Contact the client to make sure there are really something to do
* Something = migration at most maxFiles or at least maxBytes
*
* @param maxFiles files count requested.
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold);
/**
* Send an end token in the request queue. There should be no subsequent
* calls to requestInjection.
*/
void finish();
private:
/*Compute how many blocks are needed for a file of fileSize bytes*/
......@@ -86,7 +111,10 @@ private:
class Request {
public:
Request(int mf, int mb, bool lc):
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc) {}
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc), end(false) {}
Request():
nbMaxFiles(-1), byteSizeThreshold(-1), lastCall(true),end(true) {}
const int nbMaxFiles;
const int byteSizeThreshold;
......@@ -97,6 +125,11 @@ private:
* and can send into all the different threads a signal .
*/
const bool lastCall;
/**
* True indicates the task injector will not receive any more request.
*/
const bool end;
};
class WorkerThread: public castor::tape::threading::Thread {
......
......@@ -37,6 +37,9 @@
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "RecallTaskInjector.hpp"
#include "RecallReportPacker.hpp"
#include "TapeWriteSingleThread.hpp"
#include "DiskReadThreadPool.hpp"
#include "MigrationTaskInjector.hpp"
using namespace castor::tape;
using namespace castor::log;
......@@ -120,9 +123,141 @@ throw (castor::tape::Exception) {
}
void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc) {
// We are ready to start the session. We need to create the whole machinery
// in order to get the task injector ready to check if we actually have a
// file to recall.
// findDrive does not throw exceptions (it catches them to log errors)
// A NULL pointer is returned on failure
std::auto_ptr<castor::tape::drives::DriveInterface> drive(findDrive(lc));
if(!drive.get()) return;
// We can now start instantiating all the components of the data path
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz,lc);
TapeReadSingleThread trst(*drive, m_volInfo.vid,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles, lc);
RecallReportPacker rrp(m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
lc);
DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
rrp,
lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy, lc);
trst.setTaskInjector(&rti);
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
if (rti.synchronousInjection(m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
// We got something to recall. Time to start the machinery
trst.startThreads();
dwtp.startThreads();
rrp.startThreads();
rti.startThreads();
// This thread is now going to be idle until the system unwinds at the end
// of the session
// All client notifications are done by the report packer, including the
// end of session
rti.waitThreads();
rrp.waitThread();
dwtp.waitThreads();
trst.waitThreads();
} else {
// Just log this was an empty mount and that's it. The memory management
// will be deallocated automatically.
lc.log(LOG_ERR, "Aborting recall mount startup: empty mount");
LogContext::ScopedParam sp1(lc, Param("errorMessage", "Aborted: empty recall mount"));
LogContext::ScopedParam sp2(lc, Param("errorCode", SEINTERNAL));
try {
client::ClientProxy::RequestReport reqReport;
m_clientProxy.reportEndOfSessionWithError("Aborted: empty recall mount", SEINTERNAL, reqReport);
LogContext::ScopedParam sp08(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp09(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp10(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp11(lc, Param("errorMessage", "Aborted: empty recall mount"));
LogContext::ScopedParam sp12(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
} catch(castor::exception::Exception & ex) {
LogContext::ScopedParam sp1(lc, Param("notificationError", ex.getMessageValue()));
lc.log(LOG_ERR, "Failed to notified client of end session with error");
}
}
}
}
void castor::tape::tapeserver::daemon::MountSession::executeWrite(LogContext & lc) {
// We are ready to start the session. We need to create the whole machinery
// in order to get the task injector ready to check if we actually have a
// file to migrate.
// 1) Get hold of the drive error logs are done inside the findDrive function
std::auto_ptr<castor::tape::drives::DriveInterface> drive(findDrive(lc));
if (!drive.get()) return;
// Once we got hold of the drive, we can run the session
{
MemoryManager mm(m_castorConf.rtcopydNbBufs,
m_castorConf.rtcopydBufsz,lc);
MigrationReportPacker mrp(m_clientProxy,
lc);
TapeWriteSingleThread twst(*drive.get(),
m_volInfo.vid,
lc,
mrp,
m_castorConf.tapebridgeMaxFilesBeforeFlush,
m_castorConf.tapebridgeMaxBytesBeforeFlush/m_castorConf.rtcopydBufsz);
DiskReadThreadPool drtp(m_castorConf.tapeserverdDiskThreads,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
lc);
MigrationTaskInjector mti(mm, drtp, twst, m_clientProxy, lc);
if (mti.synchronousInjection(m_castorConf.tapebridgeBulkRequestMigrationMaxBytes,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles)) {
// We have something to do: start the session by starting all the
// threads.
mm.startThreads();
drtp.startThreads();
twst.startThreads();
mrp.startThreads();
mti.startThreads();
// Synchronise with end of threads
mti.waitThreads();
mrp.waitThread();
twst.waitThreads();
drtp.waitThreads();
mm.waitThreads();
} else {
// Just log this was an empty mount and that's it. The memory management
// will be deallocated automatically.
lc.log(LOG_ERR, "Aborting migration mount startup: empty mount");
LogContext::ScopedParam sp1(lc, Param("errorMessage", "Aborted: empty recall mount"));
LogContext::ScopedParam sp2(lc, Param("errorCode", SEINTERNAL));
try {
client::ClientProxy::RequestReport reqReport;
m_clientProxy.reportEndOfSessionWithError("Aborted: empty migration mount", SEINTERNAL, reqReport);
LogContext::ScopedParam sp1(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp2(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp3(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
lc.log(LOG_ERR, "Notified client of end session with error");
} catch(castor::exception::Exception & ex) {
LogContext::ScopedParam sp1(lc, Param("notificationError", ex.getMessageValue()));
lc.log(LOG_ERR, "Failed to notified client of end session with error");
}
}
}
}
void castor::tape::tapeserver::daemon::MountSession::executeDump(LogContext & lc) {
// We are ready to start the session. In case of read there is no interest in
// creating the machinery before getting the tape mounted, so do it now.
// 1) Get hold of the drive and check it.
}
castor::tape::drives::DriveInterface *
castor::tape::tapeserver::daemon::MountSession::findDrive(LogContext& lc) {
// 1) Get hold of the drive and check it.
utils::TpconfigLines::const_iterator configLine;
for (configLine = m_tpConfig.begin(); configLine != m_tpConfig.end(); configLine++) {
if (configLine->unitName == m_request.driveUnit && configLine->density == m_volInfo.density) {
......@@ -144,7 +279,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp12(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp13(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
}
// Actually find the drive.
castor::tape::SCSI::DeviceVector dv(m_sysWrapper);
......@@ -153,7 +288,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
driveInfo = dv.findBySymlink(configLine->devFilename);
} catch (castor::tape::SCSI::DeviceVector::NotFound & e) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
LogContexdoist::ScopedParam sp08(lc, Param("density", m_volInfo.density));
LogContext::ScopedParam sp09(lc, Param("devFilename", configLine->devFilename));
lc.log(LOG_ERR, "Drive not found on this path");
......@@ -167,10 +302,16 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
} catch (castor::exception::Exception & e) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
LogContext::Set the task injector ready to check if we actually have a
// file to recall.
// findDrive does not throw exceptions (it catches them to log errors)
// A NULL pointer is returned on failure
std::auto_ptr<castor::tape::drives::DriveInterface> drive(findDrive(lc));
if(!drive.get()) return;
// We can now startcopedParam sp08(lc, Param("density", m_volInfo.density));
LogContext::ScopedParam sp09(lc, Param("devFilename", configLine->devFilename));
LogContext::ScopedParam sp10(lc, Param("errorMessage", e.getMessageValue()));
lc.log(LOG_ERR, "Error looking to path to tape drive");
......@@ -185,7 +326,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp14(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp15(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
} catch (...) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
......@@ -202,11 +343,10 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
}
std::auto_ptr<castor::tape::drives::DriveInterface> drive;
try {
drive.reset(castor::tape::drives::DriveFactory(driveInfo, m_sysWrapper));
return castor::tape::drives::DriveFactory(driveInfo, m_sysWrapper);
} catch (castor::exception::Exception & e) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
......@@ -224,7 +364,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp14(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp15(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
} catch (...) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
......@@ -241,65 +381,6 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
}
// We can now start instantiating all the components of the data path
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz,lc);
TapeReadSingleThread trst(*drive, m_volInfo.vid,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles, lc);
RecallReportPacker rrp(m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
lc);
DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
rrp,
lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy, lc);
trst.setTaskInjector(&rti);
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
if (rti.synchronousInjection(m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
// We got something to recall. Time to start the machinery
trst.startThreads();
dwtp.startThreads();
rrp.startThreads();
rti.startThreads();
// This thread is now going to be idle until the system unwinds at the end
// of the session
// All client notifications are done by the report packer, including the
// end of session
rti.waitThreads();
rrp.waitThread();
dwtp.waitThreads();
trst.waitThreads();
} else {
// Just log this was an empty mount and that's it. The memory management
// will be deallocated automatically.
lc.log(LOG_ERR, "Aborting recall mount startup: empty mount");
client::ClientProxy::RequestReport reqReport;
m_clientProxy.reportEndOfSessionWithError("Aborted: empty recall mount", SEINTERNAL, reqReport);
LogContext::ScopedParam sp08(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp09(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp10(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp11(lc, Param("errorMessage", "Aborted: empty recall mount"));
LogContext::ScopedParam sp12(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
}
}
}
void castor::tape::tapeserver::daemon::MountSession::executeWrite(LogContext & lc) {
}
void castor::tape::tapeserver::daemon::MountSession::executeDump(LogContext & lc) {
// We are ready to start the session. In case of read there is no interest in
// creating the machinery before getting the tape mounted, so do it now.
// 1) Get hold of the drive and check it.
}
......@@ -57,6 +57,8 @@ namespace daemon {
tapebridgeBulkRequestMigrationMaxFiles(0),
tapebridgeBulkRequestRecallMaxBytes(0),
tapebridgeBulkRequestRecallMaxFiles(0),
tapebridgeMaxBytesBeforeFlush(0),
tapebridgeMaxFilesBeforeFlush(0),
tapeserverdDiskThreads(0) {}
uint32_t rfioConnRetry;
uint32_t rfioConnRetryInt;
......@@ -70,6 +72,8 @@ namespace daemon {
uint64_t tapebridgeBulkRequestMigrationMaxFiles;
uint64_t tapebridgeBulkRequestRecallMaxBytes;
uint64_t tapebridgeBulkRequestRecallMaxFiles;
uint64_t tapebridgeMaxBytesBeforeFlush;
uint64_t tapebridgeMaxFilesBeforeFlush;
// Other values found on production tape servers
// TAPE CRASHED_RLS_HANDLING RETRY
// TAPE CRASHED_RLS_HANDLING_RETRIES 3
......@@ -107,6 +111,10 @@ namespace daemon {
System::virtualWrapper & m_sysWrapper;
const utils::TpconfigLines & m_tpConfig;
const CastorConf & m_castorConf;
/** utility to find the drive on the system. This function logs
* all errors and hence does not throw exceptions. It returns NULL
* in case of failure. */
castor::tape::drives::DriveInterface * findDrive(LogContext & lc);
/** sub-part of execute for the read sessions */
void executeRead(LogContext & lc);
/** sub-part of execute for a write session */
......
......@@ -86,7 +86,8 @@ bool RecallTaskInjector::synchronousInjection(uint64_t maxFiles, uint64_t byteSi
{
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<castor::tape::tapegateway::FilesToRecallList> filesToRecallList(m_client.getFilesToRecall(maxFiles,byteSizeThreshold,reqReport));
std::auto_ptr<castor::tape::tapegateway::FilesToRecallList>
filesToRecallList(m_client.getFilesToRecall(maxFiles,byteSizeThreshold,reqReport));
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("maxFiles", maxFiles)),
LogContext::ScopedParam(m_lc, Param("byteSizeThreshold",byteSizeThreshold)),
......@@ -97,7 +98,7 @@ bool RecallTaskInjector::synchronousInjection(uint64_t maxFiles, uint64_t byteSi
tape::utils::suppresUnusedVariable(sp);
if(NULL==filesToRecallList.get()) {
m_lc.log(LOG_ERR, "Get called but no files to retrieve");
m_lc.log(LOG_ERR, "No files to recall: empty mount");
return false;
}
else {
......
......@@ -56,7 +56,7 @@ public:
/**
* Function for a feed-back loop purpose between RecallTaskInjector and
* DiskWriteThreadPool. When DiskWriteThreadPool::popAndRequestMoreJobs detects
* TapeReadSingleThread. When TapeReadSingleThread::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it is class to push a request
* in order to (try) fill up the queue.
* @param maxFiles files count requested.
......@@ -66,6 +66,12 @@ public:
*/
virtual void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
/**
* Send an end token in the request queue. There should be no subsequent
* calls to requestInjection.
*/
void finish();
/**
* Contact the client to make sure there are really something to do
* Something = recall at most maxFiles or at least maxBytes
......@@ -85,8 +91,6 @@ public:
* Start the inner thread
*/
void startThreads();
void finish();
private:
/**
......
......@@ -38,16 +38,15 @@ namespace tape {
namespace tapeserver {
namespace daemon {
class TapeWriteSingleThread : public TapeSingleThreadInterface<TapeWriteTaskInterface> {
class TapeWriteSingleThread : public TapeSingleThreadInterface<TapeWriteTask> {
public:
TapeWriteSingleThread(castor::tape::drives::DriveInterface & drive,
const std::string & vid,
castor::log::LogContext & lc,
MigrationReportPacker & repPacker,
castor::log::LogContext & lc, MigrationReportPacker & repPacker,
uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush):
TapeSingleThreadInterface<TapeWriteTaskInterface>(drive, vid, lc),
m_filesBeforeFlush(filesBeforeFlush),m_bytesBeforeFlush(bytesBeforeFlush),
m_drive(drive), m_reportPacker(repPacker), m_lastFseq(0), m_compress(0) {}
m_drive(drive), m_reportPacker(repPacker), m_lastFseq(0), m_compress(0) {}
private:
/**
......@@ -98,8 +97,8 @@ private:
{
// First we have to initialise the tape read session
std::auto_ptr<castor::tape::tapeFile::WriteSession> rs(openWriteSession());
uint64_t bytes=0;
uint64_t bytes=0;
uint64_t files=0;
std::auto_ptr<TapeWriteTaskInterface> task ;
while(1) {
......@@ -131,8 +130,9 @@ private:
}
}
const uint64_t m_filesBeforeFlush;
const uint64_t m_filesBeforeFlush;
const uint64_t m_bytesBeforeFlush;
castor::tape::drives::DriveInterface& m_drive;
MigrationReportPacker & m_reportPacker;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment