Commit f9249bb1 authored by Victor Kotlyar's avatar Victor Kotlyar
Browse files

Add notification from taped to the catalogue when tape mounted for read or write.

parent c7ec6523
......@@ -55,7 +55,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::DataTransferSession(
cta::tape::daemon::TapedProxy & initialProcess,
cta::server::ProcessCap & capUtils,
const DataTransferConfig & castorConf,
cta::Scheduler & scheduler):
cta::Scheduler & scheduler,
cta::catalogue::Catalogue &catalogue):
m_log(log),
m_sysWrapper(sysWrapper),
m_driveConfig(driveConfig),
......@@ -64,7 +65,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::DataTransferSession(
m_mc(mc),
m_intialProcess(initialProcess),
m_capUtils(capUtils),
m_scheduler(scheduler)
m_scheduler(scheduler),
m_catalogue(catalogue)
{
}
......@@ -236,7 +238,8 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
TapeReadSingleThread trst(*drive, m_mc, tsr, m_volInfo,
m_castorConf.bulkRequestRecallMaxFiles,m_capUtils,rwd,lc,rrp,
m_castorConf.useLbp, m_castorConf.useRAO, m_castorConf.externalEncryptionKeyScript,*retrieveMount);
m_castorConf.useLbp, m_castorConf.useRAO, m_castorConf.externalEncryptionKeyScript,*retrieveMount,
m_catalogue);
DiskWriteThreadPool dwtp(m_castorConf.nbDiskThreads,
rrp,
rwd,
......@@ -338,7 +341,8 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
m_castorConf.maxBytesBeforeFlush,
m_castorConf.useLbp,
m_castorConf.externalEncryptionKeyScript,
*archiveMount);
*archiveMount,
m_catalogue);
DiskReadThreadPool drtp(m_castorConf.nbDiskThreads,
m_castorConf.bulkRequestMigrationMaxFiles,
......
......@@ -64,7 +64,8 @@ namespace daemon {
cta::tape::daemon::TapedProxy & initialProcess,
cta::server::ProcessCap &capUtils,
const DataTransferConfig & castorConf,
cta::Scheduler &scheduler);
cta::Scheduler &scheduler,
cta::catalogue::Catalogue &catalogue);
/**
* Execute the session and return the type of action to be performed
......@@ -146,6 +147,10 @@ namespace daemon {
* The scheduler, i.e. the local interface to the Objectstore DB
*/
cta::Scheduler &m_scheduler;
/**
* Reference to the catalogue interface
*/
cta::catalogue::Catalogue &m_catalogue;
/**
* Returns the string representation of the specified mount type
......
......@@ -487,7 +487,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) {
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
castor::tape::tapeserver::daemon::DataTransferSession sess("tapeHost", logger, mockSys,
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
// 8) Run the data transfer session
sess.execute();
......@@ -692,7 +692,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) {
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
DataTransferSession sess("tapeHost", logger, mockSys,
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
// 8) Run the data transfer session
sess.execute();
......@@ -900,7 +900,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
castor::tape::tapeserver::daemon::DataTransferSession sess("tapeHost", logger, mockSys,
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
// 8) Run the data transfer session
sess.execute();
......@@ -1073,7 +1073,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) {
castor::messages::TapeserverProxyDummy initialProcess;
cta::server::ProcessCapDummy capUtils;
DataTransferSession sess("tapeHost", logger, mockSys,
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
ASSERT_NO_THROW(sess.execute());
std::string temp = logger.getLog();
ASSERT_NE(std::string::npos, logger.getLog().find("Could not stat path: /dev/noSuchDrive"));
......@@ -1223,7 +1223,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionFailtoMount) {
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
DataTransferSession sess("tapeHost", logger, mockSys,
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
ASSERT_NO_THROW(sess.execute());
std::string temp = logger.getLog();
ASSERT_NE(std::string::npos, logger.getLog().find("Failed to mount the tape"));
......@@ -1354,7 +1354,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayMigration) {
cta::mediachanger::MediaChangerFacade mc(dummyLog);
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
sess.execute();
std::string logToCheck = logger.getLog();
logToCheck += "";
......@@ -1504,7 +1504,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
cta::mediachanger::MediaChangerFacade mc(dummyLog);
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
sess.execute();
std::string temp = logger.getLog();
temp += "";
......@@ -1659,7 +1659,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullMigration) {
cta::mediachanger::MediaChangerFacade mc(dummyLog);
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
sess.execute();
std::string temp = logger.getLog();
temp += "";
......@@ -1819,7 +1819,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) {
cta::mediachanger::MediaChangerFacade mc(dummyLog);
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
DataTransferSession sess("tapeHost", logger, mockSys, driveConfig, mc, initialProcess, capUtils, castorConf, scheduler, catalogue);
sess.execute();
std::string temp = logger.getLog();
temp += "";
......
......@@ -38,7 +38,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread(
const bool useLbp,
const bool useRAO,
const std::string & externalEncryptionKeyScript,
const cta::RetrieveMount& retrieveMount) :
const cta::RetrieveMount& retrieveMount,
cta::catalogue::Catalogue &catalogue) :
TapeSingleThreadInterface<TapeReadTask>(drive, mc, initialProcess, volInfo,
capUtils, lc, externalEncryptionKeyScript),
m_maxFilesRequest(maxFilesRequest),
......@@ -46,7 +47,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread(
m_rrp(rrp),
m_useLbp(useLbp),
m_useRAO(useRAO),
m_retrieveMount(retrieveMount){}
m_retrieveMount(retrieveMount),
m_catalogue(catalogue){}
//------------------------------------------------------------------------------
//TapeCleaning::~TapeCleaning()
......@@ -300,7 +302,7 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
scoped.add("useLbp", m_useLbp);
scoped.add("detectedLbp", rs->isTapeWithLbp());
if (rs->isTapeWithLbp() && !m_useLbp) {
m_logContext.log(cta::log::WARNING, "Tapserver started without LBP support"
m_logContext.log(cta::log::WARNING, "Tapeserver started without LBP support"
" but the tape with LBP label mounted");
}
switch(m_drive.getLbpToUse()) {
......@@ -319,6 +321,17 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
}
m_initialProcess.reportState(cta::tape::session::SessionState::Running,
cta::tape::session::SessionType::Retrieve);
try {
m_catalogue.tapeMountedForRetrieve(m_volInfo.vid, m_drive.config.unitName);
} catch (cta::exception::Exception &ex) {
cta::log::ScopedParamContainer scoped(m_logContext);
params.add("tapeVid", m_volInfo.vid)
.add("tapeDrive", m_drive.config.unitName);
m_logContext.log(cta::log::WARNING,
"Failed to update catalogue for the tape mounted for retrieve.");
}
m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
// Then we will loop on the tasks as they get from
// the task injector
......
......@@ -69,7 +69,8 @@ public:
const bool useLbp,
const bool useRAO,
const std::string & externalEncryptionKeyScript,
const cta::RetrieveMount &retrieveMount);
const cta::RetrieveMount &retrieveMount,
cta::catalogue::Catalogue &catalogue);
/**
* Set the task injector. Has to be done that way (and not in the constructor)
......@@ -159,6 +160,10 @@ private:
* on which we are reading
*/
const cta::RetrieveMount& m_retrieveMount;
/**
* Reference to the catalogue interface
*/
cta::catalogue::Catalogue &m_catalogue;
/// Helper virtual function to access the watchdog from parent class
virtual void countTapeLogError(const std::string & error) {
......
......@@ -38,7 +38,8 @@ castor::tape::tapeserver::drive::DriveInterface & drive,
cta::server::ProcessCap &capUtils,
uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush,
const bool useLbp, const std::string & externalEncryptionKeyScript,
const cta::ArchiveMount & archiveMount):
const cta::ArchiveMount & archiveMount,
cta::catalogue::Catalogue &catalogue):
TapeSingleThreadInterface<TapeWriteTask>(drive, mc, tsr, volInfo,
capUtils, lc, externalEncryptionKeyScript),
m_filesBeforeFlush(filesBeforeFlush),
......@@ -49,7 +50,8 @@ castor::tape::tapeserver::drive::DriveInterface & drive,
m_compress(true),
m_useLbp(useLbp),
m_watchdog(mwd),
m_archiveMount(archiveMount){}
m_archiveMount(archiveMount),
m_catalogue(catalogue){}
//------------------------------------------------------------------------------
//TapeCleaning::~TapeCleaning()
......@@ -397,6 +399,17 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
m_initialProcess.reportState(cta::tape::session::SessionState::Running,
cta::tape::session::SessionType::Archive);
try {
m_catalogue.tapeMountedForArchive(m_volInfo.vid, m_drive.config.unitName);
} catch (cta::exception::Exception &ex) {
cta::log::ScopedParamContainer scoped(m_logContext);
params.add("tapeVid", m_volInfo.vid)
.add("tapeDrive", m_drive.config.unitName);
m_logContext.log(cta::log::WARNING,
"Failed to update catalogue for the tape mounted for archive.");
}
uint64_t bytes=0;
uint64_t files=0;
m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
......
......@@ -70,7 +70,8 @@ public:
cta::server::ProcessCap &capUtils,
uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush, const bool useLbp,
const std::string & externalEncryptionKeyScript,
const cta::ArchiveMount & archiveMount);
const cta::ArchiveMount & archiveMount,
cta::catalogue::Catalogue &catalogue);
/**
*
......@@ -197,6 +198,11 @@ private:
*/
const cta::ArchiveMount & m_archiveMount;
/**
* Reference to the catalogue interface
*/
cta::catalogue::Catalogue & m_catalogue;
protected:
/***
* Helper virtual function to access the watchdog from parent class
......
......@@ -1091,7 +1091,8 @@ int DriveHandler::runChild() {
driveHandlerProxy,
capUtils,
dataTransferConfig,
scheduler);
scheduler,
*catalogue);
auto ret = dataTransferSession.execute();
agentHeartbeat.stopAndWaitThread();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment