Commit 6e39b2e0 authored by Steven Murray's avatar Steven Murray
Browse files

TapeserverProxyZmq is now thread-safe

parent 8e691963
......@@ -35,6 +35,7 @@
#include "castor/messages/TapeserverProxyZmq.hpp"
#include "castor/messages/TapeUnmounted.pb.h"
#include "castor/messages/TapeUnmountStarted.pb.h"
#include "castor/server/MutexLocker.hpp"
//------------------------------------------------------------------------------
// constructor
......@@ -52,6 +53,8 @@ castor::messages::TapeserverProxyZmq::TapeserverProxyZmq(log::Logger &log,
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::gotRecallJobFromTapeGateway(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createRecallJobFromTapeGatewayFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -108,6 +111,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::gotRecallJobFromReadTp(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createRecallJobFromReadTpFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -163,6 +168,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
uint32_t castor::messages::TapeserverProxyZmq::gotMigrationJobFromTapeGateway(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createMigrationJobFromTapeGatewayFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -212,6 +219,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
uint32_t castor::messages::TapeserverProxyZmq::gotMigrationJobFromWriteTp(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createMigrationJobFromWriteTpFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -261,6 +270,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::tapeMountedForRecall(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createTapeMountedForRecallFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -316,6 +327,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::tapeMountedForMigration(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createTapeMountedForMigrationFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -371,6 +384,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::tapeUnmountStarted(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createTapeUnmountStartedFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -426,6 +441,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::tapeUnmounted(
const std::string &vid, const std::string &unitName) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createTapeUnmountedFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
......@@ -481,6 +498,7 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//-----------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::notifyHeartbeat(
const std::string &unitName, const uint64_t nbBlocksMoved) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createHeartbeatFrame(unitName, nbBlocksMoved);
......@@ -537,6 +555,8 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::labelError(
const std::string &unitName, const castor::exception::Exception &labelEx) {
server::MutexLocker lock(&m_mutex);
try {
const Frame rqst = createLabelErrorFrame(unitName, labelEx);
sendFrame(m_serverSocket, rqst);
......
......@@ -24,7 +24,8 @@
#include "castor/log/Logger.hpp"
#include "castor/messages/Frame.hpp"
#include "castor/messages/TapeserverProxy.hpp"
#include "castor/messages/ZmqSocketMT.hpp"
#include "castor/messages/ZmqSocketST.hpp"
#include "castor/server/Mutex.hpp"
namespace castor {
namespace messages {
......@@ -151,6 +152,12 @@ public:
private:
/**
* Mutex used to implement a critical section around the enclosed
* ZMQ socket.
*/
castor::server::Mutex m_mutex;
/**
* The object representing the API of the CASTOR logging system.
*/
......@@ -169,7 +176,7 @@ private:
/**
* Socket connecting this tape server proxy to the tape server daemon.
*/
ZmqSocketMT m_serverSocket;
ZmqSocketST m_serverSocket;
/**
* Creates a frame containing a RecallJobFromTapeGateway message.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment