Commit f831a93a authored by Eric Cano's avatar Eric Cano
Browse files

Factored out the drive detection part from MountSession::execeuteRead()

parent c3b96b49
......@@ -37,7 +37,8 @@ namespace tapeserver {
namespace daemon {
/**
* The memory manager is responsible for allocating memory blocks and distributing
* the free ones around to any class in need.
* the free ones around to any class in need. The distribution is actively run in
* a thread.
*/
class MemoryManager: private castor::tape::threading::Thread {
public:
......
......@@ -37,6 +37,7 @@
#include "castor/tape/tapeserver/drive/Drive.hpp"
#include "RecallTaskInjector.hpp"
#include "RecallReportPacker.hpp"
#include "TapeWriteSingleThread.hpp"
using namespace castor::tape;
using namespace castor::log;
......@@ -120,9 +121,95 @@ throw (castor::tape::Exception) {
}
void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc) {
// We are ready to start the session. We need to create the whole machinery
// in order to get the task injector ready to check if we actually have a
// file to recall.
// findDrive does not throw exceptions (it catches them to log errors)
// A NULL pointer is returned on failure
std::auto_ptr<castor::tape::drives::DriveInterface> drive(findDrive(lc));
if(!drive.get()) return;
// We can now start instantiating all the components of the data path
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz);
TapeReadSingleThread trst(*drive, m_volInfo.vid,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles, lc);
RecallReportPacker rrp(m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
lc);
DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
rrp,
lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy, lc);
trst.setTaskInjector(&rti);
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
if (rti.synchronousInjection(m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
// We got something to recall. Time to start the machinery
trst.startThreads();
dwtp.startThreads();
rrp.startThreads();
rti.startThreads();
// This thread is now going to be idle until the system unwinds at the end
// of the session
// All client notifications are done by the report packer, including the
// end of session
rti.waitThreads();
rrp.waitThread();
dwtp.waitThreads();
trst.waitThreads();
} else {
// Just log this was an empty mount and that's it. The memory management
// will be deallocated automatically.
lc.log(LOG_ERR, "Aborting recall mount startup: empty mount");
client::ClientProxy::RequestReport reqReport;
m_clientProxy.reportEndOfSessionWithError("Aborted: empty recall mount", SEINTERNAL, reqReport);
LogContext::ScopedParam sp08(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp09(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp10(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp11(lc, Param("errorMessage", "Aborted: empty recall mount"));
LogContext::ScopedParam sp12(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
}
}
}
void castor::tape::tapeserver::daemon::MountSession::executeWrite(LogContext & lc) {
// We are ready to start the session. We need to create the whole machinery
// in order to get the task injector ready to check if we actually have a
// file to migrate.
// 1) Get hold of the drive error logs are done inside the findDrive function
std::auto_ptr<castor::tape::drives::DriveInterface> drive(findDrive(lc));
if (!drive.get()) return;
// Once we got hold of the drive, we can run the session
{
// MemoryManager mm(m_castorConf.rtcopydNbBufs,
// m_castorConf.rtcopydBufsz);
// MigrationReportPacker mrp(m_clientProxy,
// lc);
// TapeWriteSingleThread twst(*drive.get(),
// m_volInfo.vid,
// lc,
// mrp,
// m_castorConf.<need the flushing variables here>)
}
}
void castor::tape::tapeserver::daemon::MountSession::executeDump(LogContext & lc) {
// We are ready to start the session. In case of read there is no interest in
// creating the machinery before getting the tape mounted, so do it now.
// 1) Get hold of the drive and check it.
}
castor::tape::drives::DriveInterface *
castor::tape::tapeserver::daemon::MountSession::findDrive(LogContext& lc) {
// 1) Get hold of the drive and check it.
utils::TpconfigLines::const_iterator configLine;
for (configLine = m_tpConfig.begin(); configLine != m_tpConfig.end(); configLine++) {
if (configLine->unitName == m_request.driveUnit && configLine->density == m_volInfo.density) {
......@@ -144,7 +231,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp12(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp13(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
}
// Actually find the drive.
castor::tape::SCSI::DeviceVector dv(m_sysWrapper);
......@@ -167,7 +254,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
} catch (castor::exception::Exception & e) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
......@@ -185,7 +272,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp14(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp15(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
} catch (...) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
......@@ -202,11 +289,10 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
}
std::auto_ptr<castor::tape::drives::DriveInterface> drive;
try {
drive.reset(castor::tape::drives::DriveFactory(driveInfo, m_sysWrapper));
return castor::tape::drives::DriveFactory(driveInfo, m_sysWrapper);
} catch (castor::exception::Exception & e) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
......@@ -224,7 +310,7 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp14(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp15(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
} catch (...) {
// We could not find this drive in the system's SCSI devices
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
......@@ -241,65 +327,6 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
LogContext::ScopedParam sp13(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp14(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
return NULL;
}
// We can now start instantiating all the components of the data path
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz);
TapeReadSingleThread trst(*drive, m_volInfo.vid,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles, lc);
RecallReportPacker rrp(m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
lc);
DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
rrp,
lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy, lc);
trst.setTaskInjector(&rti);
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
if (rti.synchronousInjection(m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes)) {
// We got something to recall. Time to start the machinery
trst.startThreads();
dwtp.startThreads();
rrp.startThreads();
rti.startThreads();
// This thread is now going to be idle until the system unwinds at the end
// of the session
// All client notifications are done by the report packer, including the
// end of session
rti.waitThreads();
rrp.waitThread();
dwtp.waitThreads();
trst.waitThreads();
} else {
// Just log this was an empty mount and that's it. The memory management
// will be deallocated automatically.
lc.log(LOG_ERR, "Aborting recall mount startup: empty mount");
client::ClientProxy::RequestReport reqReport;
m_clientProxy.reportEndOfSessionWithError("Aborted: empty recall mount", SEINTERNAL, reqReport);
LogContext::ScopedParam sp08(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp09(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp10(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp11(lc, Param("errorMessage", "Aborted: empty recall mount"));
LogContext::ScopedParam sp12(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
}
}
}
void castor::tape::tapeserver::daemon::MountSession::executeWrite(LogContext & lc) {
}
void castor::tape::tapeserver::daemon::MountSession::executeDump(LogContext & lc) {
// We are ready to start the session. In case of read there is no interest in
// creating the machinery before getting the tape mounted, so do it now.
// 1) Get hold of the drive and check it.
}
......@@ -107,6 +107,10 @@ namespace daemon {
System::virtualWrapper & m_sysWrapper;
const utils::TpconfigLines & m_tpConfig;
const CastorConf & m_castorConf;
/** utility to find the drive on the system. This function logs
* all errors and hence does not throw exceptions. It returns NULL
* in case of failure. */
castor::tape::drives::DriveInterface * findDrive(LogContext & lc);
/** sub-part of execute for the read sessions */
void executeRead(LogContext & lc);
/** sub-part of execute for a write session */
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment