Commit 0c0a9bac authored by David COME's avatar David COME
Browse files

Merge branch 'tapeserver'

parents 8ef77799 f3dbb029
package castor;
message HeartBeat {
required int64 bytesMoved = 1;
}
package castor;
message HeartBeat {
required int64 bytesMoved = 1;
}
......@@ -24,6 +24,8 @@
#include "castor/log/Logger.hpp"
#include "castor/tape/tapeserver/client/ClientProxy.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include <memory>
namespace castor {
namespace messages {
......@@ -126,6 +128,8 @@ public:
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) = 0;
virtual std::auto_ptr<castor::tape::tapeserver::daemon::TaskWatchDog>
createWatchdog(log::LogContext&) const = 0;
}; // class TapeserverProxy
} // namespace messages
......
......@@ -92,3 +92,10 @@ void castor::messages::TapeserverProxyDummy::tapeUnmounted(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
}
std::auto_ptr<castor::tape::tapeserver::daemon::TaskWatchDog>
castor::messages::TapeserverProxyDummy::createWatchdog(log::LogContext& lc) const{
return std::auto_ptr<castor::tape::tapeserver::daemon::TaskWatchDog>(
new castor::tape::tapeserver::daemon::DummyTaskWatchDog(lc)
);
}
\ No newline at end of file
......@@ -129,6 +129,9 @@ public:
void tapeUnmounted(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName);
virtual std::auto_ptr<castor::tape::tapeserver::daemon::TaskWatchDog>
createWatchdog(log::LogContext&) const;
}; // class TapeserverProxyDummy
......
......@@ -22,7 +22,7 @@
#include "castor/messages/TapeserverProxyDummy.hpp"
#include "castor/messages/TapeserverProxyDummyFactory.hpp"
#include "castor/log/LogContext.hpp"
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
......
......@@ -82,7 +82,7 @@ castor::messages::TapeserverProxyZmq::TapeserverProxyZmq(log::Logger &log,
void castor::messages::TapeserverProxyZmq::gotReadMountDetailsFromClient(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::messages::preFilleHeader();
castor::messages::Header header=castor::messages::preFillHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveBeforeMountStarted);
......@@ -106,7 +106,7 @@ uint64_t
castor::messages::TapeserverProxyZmq::gotWriteMountDetailsFromClient(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::messages::preFilleHeader();
castor::messages::Header header=castor::messages::preFillHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveBeforeMountStarted);
......@@ -129,7 +129,7 @@ uint64_t
void castor::messages::TapeserverProxyZmq::gotDumpMountDetailsFromClient(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::messages::preFilleHeader();
castor::messages::Header header=castor::messages::preFillHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveBeforeMountStarted);
......@@ -152,7 +152,7 @@ void castor::messages::TapeserverProxyZmq::gotDumpMountDetailsFromClient(
void castor::messages::TapeserverProxyZmq::tapeMountedForRead(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::messages::preFilleHeader();
castor::messages::Header header=castor::messages::preFillHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveTapeMounted);
......@@ -174,7 +174,7 @@ void castor::messages::TapeserverProxyZmq::tapeMountedForRead(
void castor::messages::TapeserverProxyZmq::tapeMountedForWrite(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::messages::preFilleHeader();
castor::messages::Header header=castor::messages::preFillHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveTapeMounted);
......@@ -196,7 +196,7 @@ void castor::messages::TapeserverProxyZmq::tapeMountedForWrite(
void castor::messages::TapeserverProxyZmq::tapeUnmounting(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::messages::preFilleHeader();
castor::messages::Header header=castor::messages::preFillHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveUnmountStarted);
......@@ -215,7 +215,7 @@ void castor::messages::TapeserverProxyZmq::tapeUnmounting(
void castor::messages::TapeserverProxyZmq::tapeUnmounted(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::messages::preFilleHeader();
castor::messages::Header header=castor::messages::preFillHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveTapeUnmounted);
......@@ -232,5 +232,14 @@ void castor::messages::TapeserverProxyZmq::tapeUnmounted(
// readReplyMsg
//-----------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::readReplyMsg() {
zmq::message_t blobHeader,blobBody;
m_socket.recv(&blobHeader);
m_socket.recv(&blobBody);
}
std::auto_ptr<castor::tape::tapeserver::daemon::TaskWatchDog>
castor::messages::TapeserverProxyZmq::createWatchdog(log::LogContext& lc) const{
return std::auto_ptr<castor::tape::tapeserver::daemon::TaskWatchDog>(
new castor::tape::tapeserver::daemon::TaskWatchDog(lc)
);
}
\ No newline at end of file
......@@ -133,6 +133,8 @@ public:
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName);
virtual std::auto_ptr<castor::tape::tapeserver::daemon::TaskWatchDog>
createWatchdog(log::LogContext&) const;
private:
/**
......
......@@ -32,7 +32,7 @@ void castor::messages::connectToLocalhost(zmq::socket_t& m_socket){
m_socket.connect(bindingAdress.c_str());
}
castor::messages::Header castor::messages::preFilleHeader() {
castor::messages::Header castor::messages::preFillHeader() {
castor::messages::Header header;
header.set_magic(TPMAGIC);
header.set_protocoltype(castor::messages::protocolType::Tape);
......
......@@ -47,7 +47,7 @@ template <class T> void sendMessage(zmq::socket_t& socket,const T& msg,int flag=
}
void connectToLocalhost(zmq::socket_t& m_socket);
castor::messages::Header preFilleHeader();
castor::messages::Header preFillHeader();
} // namespace messages
} // namespace castor
......@@ -31,6 +31,7 @@ add_library(castorTapeServerDaemon
TapeMessageHandler.cpp
TapeDaemonMain.cpp
TapeWriteTask.cpp
TaskWatchDog.cpp
VdqmAcceptHandler.cpp
VdqmConnectionHandler.cpp)
......
......@@ -44,7 +44,15 @@ TapeServerReporter::TapeServerReporter(
//waitThreads
//------------------------------------------------------------------------------
void TapeServerReporter::waitThreads(){
wait();
try{
wait();
}catch(const std::exception& e){
log::ScopedParamContainer sp(m_lc);
sp.add("what",e.what());
m_lc.log(LOG_ERR,"TapeServerReporter error caught while waiting");
}catch(...){
m_lc.log(LOG_ERR,"TapeServerReporter error triple ...");
}
}
//------------------------------------------------------------------------------
//tapeMountedForWrite
......@@ -93,7 +101,13 @@ TapeServerReporter::TapeServerReporter(
if(NULL==currentReport.get()) {
break;
}
currentReport->execute(*this);
try{
currentReport->execute(*this);
}catch(const std::exception& e){
log::ScopedParamContainer sp(m_lc);
sp.add("what",e.what());
m_lc.log(LOG_ERR,"TapeServerReporter error caught");
}
}
}
//------------------------------------------------------------------------------
......@@ -126,6 +140,11 @@ TapeServerReporter::TapeServerReporter(
execute(TapeServerReporter& parent){
parent.m_tapeserverProxy.tapeMountedForWrite(parent.m_volume, parent.m_unitName);
}
std::auto_ptr<TaskWatchDog> TapeServerReporter::
createWatchdog(log::LogContext& lc) const {
return m_tapeserverProxy.createWatchdog(lc);
}
}}}}
......@@ -24,7 +24,6 @@
#pragma once
#include "castor/messages/TapeserverProxy.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
......@@ -35,10 +34,13 @@
#include <stdint.h>
namespace castor {
namespace messages{
class TapeserverProxy;
}
namespace tape {
namespace tapeserver {
namespace daemon {
class TaskWatchDog;
class TapeServerReporter : private castor::tape::threading::Thread {
public:
......@@ -95,6 +97,8 @@ public:
void waitThreads();
void notifyWatchdog(uint64_t nbOfMemblocksMoved);
std::auto_ptr<TaskWatchDog> createWatchdog(log::LogContext& lc) const;
private:
/*
This internal mechanism could (should ?) be easily changed to a queue
......
......@@ -44,9 +44,10 @@
#include "castor/tape/tapeserver/daemon/GlobalStatusReporter.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/CapabilityUtils.hpp"
#include "castor/tape/tapeserver/daemon/MountSession.hpp"
using namespace castor::tape;
using namespace castor::log;
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
......@@ -468,3 +469,19 @@ castor::tape::tapeserver::daemon::MountSession::findDrive(const utils::DriveConf
return NULL;
}
}
//------------------------------------------------------------------------------
//MountSession::ctx
//------------------------------------------------------------------------------
zmq::context_t& castor::tape::tapeserver::daemon::MountSession::ctx(){
static zmq::context_t m_ctx;
return m_ctx;
}
//------------------------------------------------------------------------------
//destructor
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::MountSession::~MountSession(){
ctx().close();
}
\ No newline at end of file
......@@ -33,6 +33,7 @@
#include "castor/tape/tapeserver/client/ClientProxy.hpp"
#include "castor/tape/tapeserver/daemon/CapabilityUtils.hpp"
#include "TapeSingleThreadInterface.hpp"
#include "zmq/castorZmqWrapper.hpp"
using namespace castor::tape;
using namespace castor::log;
......@@ -113,6 +114,15 @@ namespace daemon {
int execute() ;
/** Temporary method used for debugging while building the session class */
std::string getVid() { return m_volInfo.vid; }
/**
* Return the global shared zmq context for the mount session
* TJIS FUNCTION SHALL ONLY BE CALLED IN THE FORKED PROCESS
* @return
*/
static zmq::context_t& ctx();
~MountSession();
private:
legacymsg::RtcpJobRqstMsgBody m_request;
castor::log::Logger & m_logger;
......
......@@ -70,7 +70,7 @@ private:
};
/*
TEST(tapeServer, MountSessionGooddayRecall) {
// TpcpClients only supports 32 bits session number
// This number has to be less than 2^31 as in addition there is a mix
......@@ -185,7 +185,7 @@ TEST(tapeServer, MountSessionGooddayRecall) {
temp += "";
ASSERT_EQ("V12345", sess.getVid());
}
*/
TEST(tapeServer, MountSessionNoSuchDrive) {
// TpcpClients only supports 32 bits session number
......
......@@ -822,7 +822,7 @@ void castor::tape::tapeserver::daemon::TapeDaemon::runMountSession(
params.push_back(log::Param("unitName", driveConfig.unitName));
m_log(LOG_INFO, "Mount-session child-process started", params);
zmq::context_t ctx;
try {
MountSession::CastorConf castorConf;
// This try bloc will allow us to send a failure notification to the client
......@@ -862,16 +862,12 @@ void castor::tape::tapeserver::daemon::TapeDaemon::runMountSession(
"RTCPD", "THREAD_POOL", (uint32_t)RTCPD_THREAD_POOL, &m_log);
rmc.reset(m_rmcFactory.create());
m_log(LOG_INFO, "Before context");
m_log(LOG_INFO, "After context");
try{
tapeserver.reset(m_tapeserverFactory.create(ctx));
tapeserver.reset(m_tapeserverFactory.create(MountSession::ctx()));
}
catch(const std::exception& e){
m_log(LOG_ERR, "Failed to connect ZMQ/REQ socket in MountSession");
}
m_log(LOG_INFO, "connect ZMQ context : OK");
mountSession.reset(new MountSession (
m_argc,
m_argv,
......@@ -885,7 +881,6 @@ m_log(LOG_INFO, "connect ZMQ context : OK");
m_capUtils,
castorConf
));
m_log(LOG_INFO, "foobar");
} catch (castor::exception::Exception & ex) {
try {
client::ClientProxy cl(drive->getVdqmJob());
......@@ -916,7 +911,9 @@ m_log(LOG_INFO, "connect ZMQ context : OK");
throw;
}
m_log(LOG_INFO, "Going to execute Mount Session");
exit (mountSession->execute());
int result = mountSession->execute();
// MountSession::ctx().close();
exit(result);
} catch(castor::exception::Exception & ex) {
params.push_back(log::Param("message", ex.getMessageValue()));
m_log(LOG_ERR,
......
......@@ -41,7 +41,14 @@
#include <sstream>
#include <string>
namespace{
class RAIIForContext{
zmq::context_t& m_context;
public:
RAIIForContext(zmq::context_t& ctx):m_context(ctx){}
~RAIIForContext(){m_context.close();}
};
}
//------------------------------------------------------------------------------
// exceptionThrowingMain
//
......@@ -121,6 +128,7 @@ static int exceptionThrowingMain(const int argc, char **const argv, castor::log:
const std::string vmgrHostName =
castor::common::CastorConfiguration::getConfig().getConfEntString("VMGR", "HOST");
zmq::context_t ctx;
RAIIForContext raii(ctx);
// Parse /etc/castor/TPCONFIG
castor::tape::utils::TpconfigLines tpconfigLines;
castor::tape::utils::parseTpconfigFile("/etc/castor/TPCONFIG", tpconfigLines);
......
......@@ -171,8 +171,10 @@ const messages::Header& header){
}
break;
case messages::reqType::NotifyDriveTapeUnmounted:
sendEmptyReplyToClient();
break;
case messages::reqType::NotifyDriveUnmountStarted:
sendEmptyReplyToClient();
break;
default:
m_log(LOG_ERR,"default dispatch in TapeMessageHandler");
......@@ -249,7 +251,7 @@ const castor::messages::NotifyDriveTapeMounted& body){
}
void castor::tape::tapeserver::daemon::TapeMessageHandler::sendEmptyReplyToClient(){
castor::messages::Header header = castor::messages::preFilleHeader();
castor::messages::Header header = castor::messages::preFillHeader();
header.set_reqtype(messages::reqType::NoReturnValue);
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
......
......@@ -190,10 +190,10 @@ private:
m_gsr.tapeMountedForRead();
tape::utils::Timer timer;
TaskWatchDog watchdog(m_logContext);
std::auto_ptr<TaskWatchDog> watchdog(m_gsr.createWatchdog(m_logContext));
//start the threading and ask to initiate the protocol with the tapeserverd
watchdog.startThread();
// ::sleep();
watchdog->startThread();
// Then we will loop on the tasks as they get from
// the task injector
while(1) {
......@@ -201,7 +201,7 @@ private:
TapeReadTask * task = popAndRequestMoreJobs();
m_logContext.log(LOG_DEBUG, "TapeReadThread: just got one more job");
if (task) {
task->execute(*rs, m_logContext,watchdog);
task->execute(*rs, m_logContext,*watchdog);
delete task;
} else {
log::LogContext::ScopedParam sp0(m_logContext, log::Param("time taken", timer.secs()));
......@@ -209,7 +209,7 @@ private:
break;
}
}
watchdog.stopThread();
watchdog->stopThread();
} catch(const castor::exception::Exception& e){
// we can only end there because
// moundTape, waitForDrive or crating the ReadSession failed
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment