Commit 2bf36ce4 authored by Eric Cano's avatar Eric Cano
Browse files

Simplified the interface of the TapedProxy and TapeServerReporter.

The TapeServerReporter, which is the main interface for the tape thread and tasks,
now has a simple reportState() function instead of mnay ad-hoc ones. The only ones
remaining are reportTapeUnmountedForRetrieve() and reportDiskCompleteForRetrieve()
as they allow managing the special case of retrieve where the session can from
Running to either ShutingDown or DrainingToDisk depending on which order the
threads complete.

The actual calls to send messages to taped are now 3: reportState,
addLog, removeLog.
parent 5870f0d5
......@@ -62,6 +62,7 @@ target_link_libraries(ctamessages
castorlog
ctamediachanger
ctautils
ctatapesession
protobuf
ssl
zmq)
......
......@@ -32,48 +32,6 @@ void castor::messages::TapeserverProxyDummy::reportState(const cta::tape::sessio
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) {}
//------------------------------------------------------------------------------
// gotRetrieveJobFromCTA
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::gotRetrieveJobFromCTA(
const std::string &vid, const std::string &unitName) {
}
//------------------------------------------------------------------------------
// tapeMountedForRecall
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::tapeMountedForRecall(
const std::string &vid, const std::string &unitName) {
}
//------------------------------------------------------------------------------
// tapeMountedForMigration
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::tapeMountedForMigration(
const std::string &vid, const std::string &unitName) {
}
//------------------------------------------------------------------------------
// tapeUnmountStarted
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::tapeUnmountStarted(
const std::string &vid, const std::string &unitName) {
}
//------------------------------------------------------------------------------
// tapeUnmounted
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::tapeUnmounted(
const std::string &vid, const std::string &unitName) {
}
//------------------------------------------------------------------------------
// notifyHeartbeat
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::
notifyHeartbeat(const std::string &unitName, const uint64_t nbBlocksMoved) {
}
//------------------------------------------------------------------------------
// addLogParams
//------------------------------------------------------------------------------
......
......@@ -38,24 +38,6 @@ public:
void reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) override;
void gotRetrieveJobFromCTA(const std::string &vid,
const std::string &unitName) override;
void tapeMountedForRecall(const std::string &vid,
const std::string &unitName) override;
void tapeMountedForMigration(const std::string &vid,
const std::string &unitName) override;
void tapeUnmountStarted(const std::string &vid,
const std::string &unitName) override;
void tapeUnmounted(const std::string &vid,
const std::string &unitName) override;
void notifyHeartbeat(const std::string &unitName,
const uint64_t nbBlocksMoved) override;
void addLogParams(const std::string &unitName,
const std::list<castor::log::Param> & params) override;
......
......@@ -40,6 +40,7 @@
#include "castor/messages/TapeserverProxyZmq.hpp"
#include "castor/messages/TapeUnmounted.pb.h"
#include "castor/messages/TapeUnmountStarted.pb.h"
#include "common/exception/Exception.hpp"
//------------------------------------------------------------------------------
// constructor
......@@ -62,6 +63,20 @@ void castor::messages::TapeserverProxyZmq::reportState(const cta::tape::session:
if ((type == cta::tape::session::SessionType::Archive)
&& (state == cta::tape::session::SessionState::Mounting)) {
gotArchiveJobFromCTA(vid, m_driveName, 0);
} else if ((type == cta::tape::session::SessionType::Retrieve)
&& (state == cta::tape::session::SessionState::Mounting)) {
gotRetrieveJobFromCTA(vid, m_driveName);
} else if ((type == cta::tape::session::SessionType::Retrieve)
&& (state == cta::tape::session::SessionState::Mounting)) {
gotRetrieveJobFromCTA(vid, m_driveName);
} else {
std::stringstream err;
err << "In castor::messages::TapeserverProxyZmq::reportState(): "
<< "unexpected session type/state combination: "
<< "type=" << cta::tape::session::toString(state)
<< "state=" << cta::tape::session::toString(type);
throw cta::exception::Exception(err.str());
}
}
......
......@@ -55,25 +55,24 @@ private:
uint32_t gotArchiveJobFromCTA(const std::string &vid,
const std::string &unitName, const uint32_t nbFiles);
public:
void gotRetrieveJobFromCTA(const std::string &vid,
const std::string &unitName) override;
const std::string &unitName);
void tapeMountedForRecall(const std::string &vid,
const std::string &unitName) override;
void tapeMountedForMigration(const std::string &vid,
const std::string &unitName) override;
const std::string &unitName);
void tapeMountedForMigration(const std::string &vid,
const std::string &unitName);
void tapeUnmountStarted(const std::string &vid,
const std::string &unitName) override {}
const std::string &unitName) {}
void tapeUnmounted(const std::string &vid,
const std::string &unitName) override {}
const std::string &unitName) {}
void notifyHeartbeat(const std::string &unitName,
const uint64_t nbBlocksMoved) override;
const uint64_t nbBlocksMoved);
public:
virtual void addLogParams(const std::string &unitName,
const std::list<castor::log::Param> & params) override;
......
......@@ -148,7 +148,8 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
m_hostname, m_volInfo, lc);
//we retrieved the detail from the client in execute, so at this point
//we can already report !
tsr.gotReadMountDetailsFromClient();
tsr.reportState(cta::tape::session::SessionState::Mounting,
cta::tape::session::SessionType::Retrieve);
TapeReadSingleThread trst(*drive, m_mc, tsr, m_volInfo,
m_castorConf.bulkRequestRecallMaxFiles,m_capUtils,rwd,lc,rrp,m_castorConf.useLbp);
......@@ -271,9 +272,9 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
//we can report we will mount the tape.
// TODO: create a "StartingSession" state as the mounting will happen in
// the to-be-created tape thread.
// Initialise with something obviously wrong.
try {
tsr.gotWriteMountDetailsFromClient();
tsr.reportState(cta::tape::session::SessionState::Mounting,
cta::tape::session::SessionType::Archive);
} catch (cta::exception::Exception & e) {
log::LogContext::ScopedParam sp1(lc, log::Param("errorMessage", e.getMessage().str()));
lc.log(LOG_INFO, "Aborting the session after problem with mount details. Notifying the client.");
......
......@@ -95,7 +95,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter);
m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType() ?
"TapeReadSingleThread : tape unmounted":"TapeReadSingleThread : tape NOT unmounted (manual mode)");
m_this.m_initialProcess.tapeUnmounted();
m_this.m_initialProcess.reportTapeUnmountedForRetrieve();
m_this.m_stats.waitReportingTime += m_timer.secs(castor::utils::Timer::resetCounter);
} catch(const cta::exception::Exception& ex){
// Something failed during the cleaning
......@@ -247,7 +247,8 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
"unsupported LBP started");
}
}
m_initialProcess.tapeMountedForRead();
m_initialProcess.reportState(cta::tape::session::SessionState::Running,
cta::tape::session::SessionType::Retrieve);
m_stats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
// Then we will loop on the tasks as they get from
// the task injector
......
......@@ -34,6 +34,7 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//-----------------------------------------------------------------------------
//constructor
//------------------------------------------------------------------------------
......@@ -58,119 +59,112 @@ TapeServerReporter::TapeServerReporter(
//------------------------------------------------------------------------------
//finish
//------------------------------------------------------------------------------
void TapeServerReporter::finish(){
m_fifo.push(NULL);
}
void TapeServerReporter::finish(){
m_fifo.push(NULL);
}
//------------------------------------------------------------------------------
//startThreads
//------------------------------------------------------------------------------
void TapeServerReporter::startThreads(){
start();
m_threadRunnig=true;
}
void TapeServerReporter::startThreads(){
start();
m_threadRunnig=true;
}
//------------------------------------------------------------------------------
//waitThreads
//------------------------------------------------------------------------------
void TapeServerReporter::waitThreads(){
try{
wait();
m_threadRunnig=false;
}catch(const std::exception& e){
log::ScopedParamContainer sp(m_lc);
sp.add("what",e.what());
m_lc.log(LOG_ERR,"error caught while waiting");
}catch(...){
m_lc.log(LOG_ERR,"unknown error while waiting");
}
}
//------------------------------------------------------------------------------
//tapeMountedForWrite
//------------------------------------------------------------------------------
void TapeServerReporter::tapeMountedForWrite(){
m_fifo.push(
new ReportTapeMountedForWrite()
);
}
//------------------------------------------------------------------------------
//gotWriteMountDetailsFromClient
//------------------------------------------------------------------------------
void TapeServerReporter::gotWriteMountDetailsFromClient(){
m_tapeserverProxy.reportState(cta::tape::session::SessionState::Mounting,
cta::tape::session::SessionType::Archive, m_volume.vid);
}
void TapeServerReporter::waitThreads(){
try{
wait();
m_threadRunnig=false;
}catch(const std::exception& e){
log::ScopedParamContainer sp(m_lc);
sp.add("what",e.what());
m_lc.log(LOG_ERR,"error caught while waiting");
}catch(...){
m_lc.log(LOG_ERR,"unknown error while waiting");
}
}
//------------------------------------------------------------------------------
//gotReadMountDetailsFromClient
//reportState
//------------------------------------------------------------------------------
void TapeServerReporter::gotReadMountDetailsFromClient(){
m_fifo.push(
new ReportGotReadDetailsFromClient()
);
}
void TapeServerReporter::reportState(cta::tape::session::SessionState state,
cta::tape::session::SessionType type) {
m_fifo.push(new ReportStateChange(state, type));
}
//------------------------------------------------------------------------------
//tapeMountedForRead
//------------------------------------------------------------------------------
void TapeServerReporter::tapeMountedForRead(){
m_fifo.push(new ReportTapeMountedForRead());
}
//reportTapeUnmountedForRetrieve
//------------------------------------------------------------------------------
void TapeServerReporter::reportTapeUnmountedForRetrieve() {
m_fifo.push(new ReportTapeUnmountedForRetrieve());
}
//------------------------------------------------------------------------------
//tapeUnmounted
//------------------------------------------------------------------------------
void TapeServerReporter::tapeUnmounted(){
m_fifo.push(new ReportTapeUnmounted());
}
//reportDiskCompleteForRetrieve
//------------------------------------------------------------------------------
void TapeServerReporter::reportDiskCompleteForRetrieve() {
m_fifo.push(new ReportDiskCompleteForRetrieve());
}
//------------------------------------------------------------------------------
//run
//------------------------------------------------------------------------------
void TapeServerReporter::run(){
while(1){
std::unique_ptr<Report> currentReport(m_fifo.pop());
if(NULL==currentReport.get()) {
break;
}
try{
currentReport->execute(*this);
}catch(const std::exception& e){
log::ScopedParamContainer sp(m_lc);
sp.add("what",e.what());
m_lc.log(LOG_ERR,"TapeServerReporter error caught");
}
void TapeServerReporter::run(){
while(1){
std::unique_ptr<Report> currentReport(m_fifo.pop());
if(NULL==currentReport.get()) {
break;
}
try{
currentReport->execute(*this);
}catch(const std::exception& e){
log::ScopedParamContainer sp(m_lc);
sp.add("what",e.what());
m_lc.log(LOG_ERR,"TapeServerReporter error caught");
}
}
}
//------------------------------------------------------------------------------
// ReportGotReadDetailsFromClient::execute
//------------------------------------------------------------------------------
void TapeServerReporter::ReportGotReadDetailsFromClient::execute(
TapeServerReporter& parent){
log::ScopedParamContainer sp(parent.m_lc);
sp.add("unitName", parent.m_unitName)
.add("TPVID", parent.m_volume.vid);
// ReportStateChange::ReportStateChange())
//------------------------------------------------------------------------------
TapeServerReporter::ReportStateChange::ReportStateChange(cta::tape::session::SessionState state,
cta::tape::session::SessionType type): m_state(state), m_type(type) { }
return parent.m_tapeserverProxy.gotRetrieveJobFromCTA(parent.m_volume.vid, parent.m_unitName);
}
//------------------------------------------------------------------------------
// ReportTapeMountedForRead::execute
//------------------------------------------------------------------------------
void TapeServerReporter::ReportTapeMountedForRead::
execute(TapeServerReporter& parent){
parent.m_tapeserverProxy.tapeMountedForRecall(parent.m_volume.vid,
parent.m_unitName);
}
// ReportStateChange::execute())
//------------------------------------------------------------------------------
void TapeServerReporter::ReportStateChange::execute(TapeServerReporter& parent) {
parent.m_tapeserverProxy.reportState(m_state, m_type, parent.m_volume.vid);
}
//------------------------------------------------------------------------------
// ReportTapeUnmounted::execute
//------------------------------------------------------------------------------
void TapeServerReporter::ReportTapeUnmounted::
execute(TapeServerReporter& parent){
parent.m_tapeserverProxy.tapeUnmounted(parent.m_volume.vid,
parent.m_unitName);
}
// ReportTapeUnmountedForRetrieve::execute())
//------------------------------------------------------------------------------
void TapeServerReporter::ReportTapeUnmountedForRetrieve::execute(TapeServerReporter& parent) {
parent.m_tapeUnmountedForRecall=true;
if (parent.m_diskCompleteForRecall) {
parent.m_tapeserverProxy.reportState(cta::tape::session::SessionState::Shutdown,
cta::tape::session::SessionType::Retrieve, parent.m_volume.vid);
} else {
parent.m_tapeserverProxy.reportState(cta::tape::session::SessionState::DrainingToDisk,
cta::tape::session::SessionType::Retrieve, parent.m_volume.vid);
}
}
//------------------------------------------------------------------------------
// ReportTapeMounterForWrite::execute
//------------------------------------------------------------------------------
void TapeServerReporter::ReportTapeMountedForWrite::
execute(TapeServerReporter& parent){
parent.m_tapeserverProxy.tapeMountedForMigration(parent.m_volume.vid,
parent.m_unitName);
}
}}}}
// ReportDiskCompleteForRetrieve::execute())
//------------------------------------------------------------------------------
void TapeServerReporter::ReportDiskCompleteForRetrieve::execute(TapeServerReporter& parent) {
parent.m_diskCompleteForRecall=true;
if (parent.m_tapeUnmountedForRecall) {
parent.m_tapeserverProxy.reportState(cta::tape::session::SessionState::Shutdown,
cta::tape::session::SessionType::Retrieve, parent.m_volume.vid);
}
}
}}}} // namespace castor::tape::tapeserver::daemon
......@@ -28,6 +28,8 @@
#include "castor/tape/tapeserver/daemon/DriveConfig.hpp"
#include "castor/tape/tapeserver/daemon/VolumeInfo.hpp"
#include "castor/log/LogContext.hpp"
#include "tapeserver/session/SessionState.hpp"
#include "tapeserver/session/SessionType.hpp"
#include <memory>
#include <string>
#include <stdint.h>
......@@ -63,34 +65,26 @@ public:
* to stop
*/
void finish();
/**
* Will call TapeserverProxy::gotWriteMountDetailsFromClient
* @param unitName The unit name of the tape drive.
* @param vid The Volume ID of the tape to be mounted.
*/
void gotReadMountDetailsFromClient();
/**
* Will call TapeserverProxy::tapeUnmounted and VdqmProx::tapeUnmounted()
* Will call TapedProxy::reportState();
*/
void tapeUnmounted();
//------------------------------------------------------------------------------
void reportState(cta::tape::session::SessionState state,
cta::tape::session::SessionType type);
/**
* Will call TapeserverProxy::tapeMountedForRead,
* Special function managing the special case of retrieves, where disk and
* tape thread can finish in different orders (tape part)
*/
void tapeMountedForRead();
void tapeMountedForWrite();
void reportTapeUnmountedForRetrieve();
//The following function could be split into 2 parts
/**
* Notify the client we got the fseq of the first file to transfer we get in
* exchange return the number of files on the tape according to the VMGR
* @return the number of files on the tape according to the VMGR
* Special function managing the special case of retrieves, where disk and
* tape thread can finish in different orders (disk part)
*/
void gotWriteMountDetailsFromClient();
void reportDiskCompleteForRetrieve();
//------------------------------------------------------------------------------
//start and wait for thread to finish
void startThreads();
void waitThreads();
......@@ -113,22 +107,25 @@ private:
virtual ~Report(){}
virtual void execute(TapeServerReporter&)=0;
};
class ReportGotReadDetailsFromClient : public Report {
public:
virtual void execute(TapeServerReporter&);
};
class ReportTapeMountedForRead : public Report {
class ReportStateChange: public Report {
public:
virtual void execute(TapeServerReporter&);
ReportStateChange(cta::tape::session::SessionState state,
cta::tape::session::SessionType type);
void execute(TapeServerReporter&) override;
private:
cta::tape::session::SessionState m_state;
cta::tape::session::SessionType m_type;
};
class ReportTapeUnmounted : public Report {
class ReportTapeUnmountedForRetrieve: public Report {
public:
virtual void execute(TapeServerReporter&);
void execute(TapeServerReporter&) override;
};
class ReportTapeMountedForWrite : public Report {
class ReportDiskCompleteForRetrieve: public Report {
public:
virtual void execute(TapeServerReporter&);
void execute(TapeServerReporter&) override;
};
/**
* Inherited from Thread, it will do the job : pop a request, execute it
......@@ -151,6 +148,18 @@ private:
* Log context, copied because it is in a separated thread
*/
log::LogContext m_lc;
/**
* Boolean allowing the management of the special case of recall where
* end of tape and disk threads can happen in any order (tape side)
*/
bool m_tapeUnmountedForRecall = false;
/**
* Boolean allowing the management of the special case of recall where
* end of tape and disk threads can happen in any order (disk side)
*/
bool m_diskCompleteForRecall = false;
const std::string m_server;
const std::string m_unitName;
......
......@@ -223,7 +223,8 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
}
}
m_initialProcess.tapeMountedForWrite();
m_initialProcess.reportState(cta::tape::session::SessionState::Running,
cta::tape::session::SessionType::Archive);
uint64_t bytes=0;
uint64_t files=0;
m_stats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
......
......@@ -145,7 +145,8 @@ private:
m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter);
m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType() ?
"TapeWriteSingleThread : tape unmounted":"TapeWriteSingleThread : tape NOT unmounted (manual mode)");
m_this.m_initialProcess.tapeUnmounted();
m_this.m_initialProcess.reportState(cta::tape::session::SessionState::Shutdown,
cta::tape::session::SessionType::Archive);
m_this.m_stats.waitReportingTime += m_timer.secs(castor::utils::Timer::resetCounter);
}
catch(const cta::exception::Exception& ex){
......
......@@ -224,7 +224,7 @@ protected:
castor::server::MutexLocker locker(&m_mutex);
m_lc.log(LOG_DEBUG,"going to report");
m_reportTimer.reset();
m_initialProcess.notifyHeartbeat(m_driveUnitName, m_nbOfMemblocksMoved);
m_initialProcess.reportHeartbeat(m_nbOfMemblocksMoved, 0);
reportStats();
m_nbOfMemblocksMoved=0;
}
......
......@@ -22,11 +22,12 @@
namespace cta { namespace tape { namespace daemon {
DriveHandlerProxy::DriveHandlerProxy(server::SocketPair& socketPair): m_socketPair(socketPair) {
m_socketPair.close(server::SocketPair::Side::parent);
m_socketPair.close(server::SocketPair::Side::parent);
}
void DriveHandlerProxy::addLogParams(const std::string& unitName, const std::list<castor::log::Param>& params) {}
void DriveHandlerProxy::addLogParams(const std::string& unitName, const std::list<castor::log::Param>& params) {
throw exception::Exception(std::string(__FUNCTION__) + " not implemented");
}
}}} // namespace cta::tape::daemon
......@@ -40,13 +40,7 @@ public:
void reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) override;
void addLogParams(const std::string& unitName, const std::list<castor::log::Param>& params) override;
void deleteLogParams(const std::string& unitName, const std::list<std::string>& paramNames) override;
void gotRetrieveJobFromCTA(const std::string& vid, const std::string& unitName) override;
void labelError(const std::string& unitName, const std::string& message) override;
void notifyHeartbeat(const std::string& unitName, const uint64_t nbBytesMoved) override;
void tapeMountedForMigration(const std::string& vid, const std::string& unitName) override;
void tapeMountedForRecall(const std::string& vid, const std::string& unitName) override;
void tapeUnmountStarted(const std::string& vid, const std::string& unitName) override;
void tapeUnmounted(const std::string& vid, const std::string& unitName) override;