Commit c23fdcee authored by Steven Murray's avatar Steven Murray
Browse files

Separated I/O skeleton logic of TapeMessageHandler from its business logic

parent b161b7f4
......@@ -223,6 +223,7 @@ set (CLIENT_LIB_SRC_FILES
stager/BulkRequestResult.cpp
stager/FileResult.cpp
utils/DebugBuf.cpp
utils/ScopedLock.cpp
utils/SmartFd.cpp
utils/SmartFILEPtr.cpp
utils/utils.cpp
......
......@@ -4,6 +4,7 @@ add_library(castormessages SHARED
ForkDataTransfer.pb.cc
ForkLabel.pb.cc
ForkSucceeded.pb.cc
Frame.cpp
Header.pb.cc
Heartbeat.pb.cc
messages.cpp
......@@ -14,6 +15,7 @@ add_library(castormessages SHARED
ProcessExited.pb.cc
RecallJobFromReadTp.pb.cc
RecallJobFromTapeGateway.pb.cc
ReturnValue.pb.cc
StopProcessForker.pb.cc
TapeMountedForMigration.pb.cc
TapeMountedForRecall.pb.cc
......@@ -25,6 +27,9 @@ add_library(castormessages SHARED
TapeserverProxyZmqFactory.cpp
TapeUnmountStarted.pb.cc
TapeUnmounted.pb.cc
ZmqMsg.cpp
ZmqSocket.cpp
ZmqSocketMT.cpp
)
target_link_libraries(castormessages protobuf ssl)
......
......@@ -23,31 +23,29 @@
#pragma once
namespace castor {
namespace messages{
struct protocolType{
enum {
Tape
};
};
struct reqType{
enum {
ReturnValue,
Heartbeat,
MigrationJobFromTapeGateway,
MigrationJobFromWriteTp,
NbFilesOnTape,
RecallJobFromReadTp,
RecallJobFromTapeGateway,
TapeMountedForMigration,
TapeMountedForRecall,
TapeUnmountStarted,
TapeUnmounted
};
};
struct protocolVersion{
enum {
prototype
};
};
}
}
namespace messages {
enum ProtocolType {
PROTOCOL_TYPE_TAPE
};
enum MsgType {
MSG_TYPE_RETURNVALUE,
MSG_TYPE_HEARTBEAT,
MSG_TYPE_MIGRATIONJOBFROMTAPEGATEWAY,
MSG_TYPE_MIGRATIONJOBFROMWRITETP,
MSG_TYPE_NBFILESONTAPE,
MSG_TYPE_RECALLJOBFROMREADTP,
MSG_TYPE_RECALLJOBFROMTAPEGATEWAY,
MSG_TYPE_TAPEMOUNTEDFORMIGRATION,
MSG_TYPE_TAPEMOUNTEDFORRECALL,
MSG_TYPE_TAPEUNMOUNTSTARTED,
MSG_TYPE_TAPEUNMOUNTED
};
enum ProtocolVersion {
PROTOCOL_VERSION_1
};
} // namespace messages
} // namespace castor
/******************************************************************************
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/exception/Exception.hpp"
#include "castor/messages/Constants.hpp"
#include "castor/messages/Frame.hpp"
#include "castor/messages/messages.hpp"
//------------------------------------------------------------------------------
// calcAndSetHashValueOfBody
//------------------------------------------------------------------------------
void castor::messages::Frame::calcAndSetHashValueOfBody() {
try {
header.set_bodyhashvalue(messages::computeSHA1Base64(body));
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Frame failed to calculate the hash value of the frame"
"body and store it in the header: " << ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// checkHashValueOfBody
//------------------------------------------------------------------------------
void castor::messages::Frame::checkHashValueOfBody() const {
const std::string bodyHash = castor::messages::computeSHA1Base64(body);
if(bodyHash != header.bodyhashvalue()){
castor::exception::Exception ex;
ex.getMessage() << "Hash value of frame body does match the value stored"
" in the header: header.bodyhashvalue=" << header.bodyhashvalue() <<
" bodyHash=" << bodyHash;
throw ex;
}
}
//------------------------------------------------------------------------------
// serialiseHeaderToZmqMsg
//------------------------------------------------------------------------------
void castor::messages::Frame::serializeHeaderToZmqMsg(ZmqMsg &msg) const {
try {
if(!header.IsInitialized()) {
castor::exception::Exception ex;
ex.getMessage() << "Frame header is not initialized";
throw ex;
}
if(header.ByteSize() != (int)msg.size()) {
castor::exception::Exception ex;
ex.getMessage() << "Size of frame header does not match that of ZMQ"
" message: header.ByteSize()=" << header.ByteSize() << " msg.size()="
<< msg.size();
throw ex;
}
if(!header.SerializeToArray(msg.getData(), header.ByteSize())) {
castor::exception::Exception ex;
ex.getMessage() << "header.SerializeToArray() returned false";
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to serialize frame header to ZMQ message: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// parseZmqMsgIntoHeader
//------------------------------------------------------------------------------
void castor::messages::Frame::parseZmqMsgIntoHeader(const ZmqMsg &msg) {
try {
if(!header.ParseFromArray(msg.getData(), msg.size())) {
castor::exception::Exception ex;
ex.getMessage() << "header.ParseFromArray() returned false";
throw ex;
}
if(TPMAGIC != header.magic()) {
castor::exception::Exception ex;
ex.getMessage() << "Wrong magic number in the header: expected=" <<
TPMAGIC << " actual=" << header.magic();
throw ex;
}
if(PROTOCOL_TYPE_TAPE != header.protocoltype()) {
castor::exception::Exception ex;
ex.getMessage() << "Wrong protocol specified in the header: expected=" <<
PROTOCOL_TYPE_TAPE << " actual=" << header.protocoltype();
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to parse ZMQ message into frame header: " <<
ne.getMessage().str();
throw ex;
}
}
/******************************************************************************
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/messages/Header.pb.h"
#include "castor/messages/ZmqMsg.hpp"
#include "castor/utils/utils.hpp"
#include <string>
namespace castor {
namespace messages {
/**
* Structure representing a message frame.
*/
struct Frame {
/**
* The header of the frame.
*/
messages::Header header;
/**
* The body of the frame.
*/
std::string body;
/**
* Calculates the heash value of the frame body and records the result in the
* frame header.
*/
void calcAndSetHashValueOfBody();
/**
* Checks the hash value field of the header against the body of the frame.
*/
void checkHashValueOfBody() const;
/**
* Serializes the frame header to the specified ZMQ message.
*
* Please note that the specified size of the specified ZMQ message must
* match that of the header.
*
* @param msg Output parameter: The ZMQ message.
*/
void serializeHeaderToZmqMsg(ZmqMsg &msg) const;
/**
* Parses the specified ZMQ message into the frame header.
*
* @param msg The ZMQ message.
*/
void parseZmqMsgIntoHeader(const ZmqMsg &msg);
/**
* Serializes the specified protocol buffer into the frame body.
*
* @param protocolBuffer The protocol buffer.
*/
template <class T> void serializeProtocolBufferIntoBody(
const T &protocolBuffer) {
if(!protocolBuffer.SerializeToString(&body)) {
castor::exception::Exception ex;
ex.getMessage() << "Frame failed to serialize protocol buffer " <<
castor::utils::demangledNameOf(protocolBuffer) << " into frame body: "
<< ": SerializeToString() returned false";
throw ex;
}
}
/**
* Parses the body into the specified protocol buffer.
*
* @param pb Output parameter: The protocol buffer to be written to.
*/
template <class T> void parseBodyIntoProtocolBuffer(T &protocolBuffer) const {
if(!protocolBuffer.ParseFromString(body)) {
castor::exception::Exception ex;
ex.getMessage() << "Frame failed to parse contents of enclosed ZMQ"
" message into protocol buffer " <<
castor::utils::demangledNameOf(protocolBuffer)
<< ": ParseFromString() returned false";
throw ex;
}
}
}; // struct Frame
} // namespace messages
} // namespace castor
This diff is collapsed.
<
......@@ -34,7 +34,6 @@ void protobuf_AssignDesc_Header_2eproto();
void protobuf_ShutdownFile_Header_2eproto();
class Header;
class ReturnValue;
// ===================================================================
......@@ -99,61 +98,61 @@ class Header : public ::google::protobuf::Message {
inline ::google::protobuf::uint32 magic() const;
inline void set_magic(::google::protobuf::uint32 value);
// required uint32 protocolType = 2;
// required uint32 protocoltype = 2;
inline bool has_protocoltype() const;
inline void clear_protocoltype();
static const int kProtocolTypeFieldNumber = 2;
static const int kProtocoltypeFieldNumber = 2;
inline ::google::protobuf::uint32 protocoltype() const;
inline void set_protocoltype(::google::protobuf::uint32 value);
// required uint32 protocolVersion = 3;
// required uint32 protocolversion = 3;
inline bool has_protocolversion() const;
inline void clear_protocolversion();
static const int kProtocolVersionFieldNumber = 3;
static const int kProtocolversionFieldNumber = 3;
inline ::google::protobuf::uint32 protocolversion() const;
inline void set_protocolversion(::google::protobuf::uint32 value);
// required uint32 reqType = 4;
inline bool has_reqtype() const;
inline void clear_reqtype();
static const int kReqTypeFieldNumber = 4;
inline ::google::protobuf::uint32 reqtype() const;
inline void set_reqtype(::google::protobuf::uint32 value);
// required uint32 msgtype = 4;
inline bool has_msgtype() const;
inline void clear_msgtype();
static const int kMsgtypeFieldNumber = 4;
inline ::google::protobuf::uint32 msgtype() const;
inline void set_msgtype(::google::protobuf::uint32 value);
// required string bodyHashType = 5;
// required string bodyhashtype = 5;
inline bool has_bodyhashtype() const;
inline void clear_bodyhashtype();
static const int kBodyHashTypeFieldNumber = 5;
static const int kBodyhashtypeFieldNumber = 5;
inline const ::std::string& bodyhashtype() const;
inline void set_bodyhashtype(const ::std::string& value);
inline void set_bodyhashtype(const char* value);
inline void set_bodyhashtype(const char* value, size_t size);
inline ::std::string* mutable_bodyhashtype();
// required string bodyHashValue = 6;
// required string bodyhashValue = 6;
inline bool has_bodyhashvalue() const;
inline void clear_bodyhashvalue();
static const int kBodyHashValueFieldNumber = 6;
static const int kBodyhashValueFieldNumber = 6;
inline const ::std::string& bodyhashvalue() const;
inline void set_bodyhashvalue(const ::std::string& value);
inline void set_bodyhashvalue(const char* value);
inline void set_bodyhashvalue(const char* value, size_t size);
inline ::std::string* mutable_bodyhashvalue();
// required string bodySignatureType = 7;
// required string bodysignaturetype = 7;
inline bool has_bodysignaturetype() const;
inline void clear_bodysignaturetype();
static const int kBodySignatureTypeFieldNumber = 7;
static const int kBodysignaturetypeFieldNumber = 7;
inline const ::std::string& bodysignaturetype() const;
inline void set_bodysignaturetype(const ::std::string& value);
inline void set_bodysignaturetype(const char* value);
inline void set_bodysignaturetype(const char* value, size_t size);
inline ::std::string* mutable_bodysignaturetype();
// required string bodySignature = 8;
// required string bodysignature = 8;
inline bool has_bodysignature() const;
inline void clear_bodysignature();
static const int kBodySignatureFieldNumber = 8;
static const int kBodysignatureFieldNumber = 8;
inline const ::std::string& bodysignature() const;
inline void set_bodysignature(const ::std::string& value);
inline void set_bodysignature(const char* value);
......@@ -168,7 +167,7 @@ class Header : public ::google::protobuf::Message {
::google::protobuf::uint32 magic_;
::google::protobuf::uint32 protocoltype_;
::google::protobuf::uint32 protocolversion_;
::google::protobuf::uint32 reqtype_;
::google::protobuf::uint32 msgtype_;
::std::string* bodyhashtype_;
static const ::std::string _default_bodyhashtype_;
::std::string* bodyhashvalue_;
......@@ -197,107 +196,6 @@ class Header : public ::google::protobuf::Message {
void InitAsDefaultInstance();
static Header* default_instance_;
};
// -------------------------------------------------------------------
class ReturnValue : public ::google::protobuf::Message {
public:
ReturnValue();
virtual ~ReturnValue();
ReturnValue(const ReturnValue& from);
inline ReturnValue& operator=(const ReturnValue& from) {
CopyFrom(from);
return *this;
}
inline const ::google::protobuf::UnknownFieldSet& unknown_fields() const {
return _unknown_fields_;
}
inline ::google::protobuf::UnknownFieldSet* mutable_unknown_fields() {
return &_unknown_fields_;
}
static const ::google::protobuf::Descriptor* descriptor();
static const ReturnValue& default_instance();
void Swap(ReturnValue* other);
// implements Message ----------------------------------------------
ReturnValue* New() const;
void CopyFrom(const ::google::protobuf::Message& from);
void MergeFrom(const ::google::protobuf::Message& from);
void CopyFrom(const ReturnValue& from);
void MergeFrom(const ReturnValue& from);
void Clear();
bool IsInitialized() const;
int ByteSize() const;
bool MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input);
void SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const;
::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const;
int GetCachedSize() const { return _cached_size_; }
private:
void SharedCtor();
void SharedDtor();
void SetCachedSize(int size) const;
public:
::google::protobuf::Metadata GetMetadata() const;
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
// required uint32 returnValue = 1;
inline bool has_returnvalue() const;
inline void clear_returnvalue();
static const int kReturnValueFieldNumber = 1;
inline ::google::protobuf::uint32 returnvalue() const;
inline void set_returnvalue(::google::protobuf::uint32 value);
// required string message = 2;
inline bool has_message() const;
inline void clear_message();
static const int kMessageFieldNumber = 2;
inline const ::std::string& message() const;
inline void set_message(const ::std::string& value);
inline void set_message(const char* value);
inline void set_message(const char* value, size_t size);
inline ::std::string* mutable_message();
// @@protoc_insertion_point(class_scope:castor.messages.ReturnValue)
private:
::google::protobuf::UnknownFieldSet _unknown_fields_;
mutable int _cached_size_;
::google::protobuf::uint32 returnvalue_;
::std::string* message_;
static const ::std::string _default_message_;
friend void protobuf_AddDesc_Header_2eproto();
friend void protobuf_AssignDesc_Header_2eproto();
friend void protobuf_ShutdownFile_Header_2eproto();
::google::protobuf::uint32 _has_bits_[(2 + 31) / 32];
// WHY DOES & HAVE LOWER PRECEDENCE THAN != !?
inline bool _has_bit(int index) const {
return (_has_bits_[index / 32] & (1u << (index % 32))) != 0;
}
inline void _set_bit(int index) {
_has_bits_[index / 32] |= (1u << (index % 32));
}
inline void _clear_bit(int index) {
_has_bits_[index / 32] &= ~(1u << (index % 32));
}
void InitAsDefaultInstance();
static ReturnValue* default_instance_;
};
// ===================================================================
......@@ -321,7 +219,7 @@ inline void Header::set_magic(::google::protobuf::uint32 value) {
magic_ = value;
}
// required uint32 protocolType = 2;
// required uint32 protocoltype = 2;
inline bool Header::has_protocoltype() const {
return _has_bit(1);
}
......@@ -337,7 +235,7 @@ inline void Header::set_protocoltype(::google::protobuf::uint32 value) {
protocoltype_ = value;
}
// required uint32 protocolVersion = 3;
// required uint32 protocolversion = 3;
inline bool Header::has_protocolversion() const {
return _has_bit(2);
}
......@@ -353,23 +251,23 @@ inline void Header::set_protocolversion(::google::protobuf::uint32 value) {
protocolversion_ = value;
}
// required uint32 reqType = 4;
inline bool Header::has_reqtype() const {
// required uint32 msgtype = 4;
inline bool Header::has_msgtype() const {
return _has_bit(3);
}
inline void Header::clear_reqtype() {
reqtype_ = 0u;
inline void Header::clear_msgtype() {
msgtype_ = 0u;
_clear_bit(3);
}
inline ::google::protobuf::uint32 Header::reqtype() const {
return reqtype_;
inline ::google::protobuf::uint32 Header::msgtype() const {
return msgtype_;
}
inline void Header::set_reqtype(::google::protobuf::uint32 value) {
inline void Header::set_msgtype(::google::protobuf::uint32 value) {
_set_bit(3);
reqtype_ = value;
msgtype_ = value;
}
// required string bodyHashType = 5;
// required string bodyhashtype = 5;
inline bool Header::has_bodyhashtype() const {
return _has_bit(4);
}
......@@ -411,7 +309,7 @@ inline ::std::string* Header::mutable_bodyhashtype() {
return bodyhashtype_;
}
// required string bodyHashValue = 6;
// required string bodyhashValue = 6;
inline bool Header::has_bodyhashvalue() const {
return _has_bit(5);
}
......@@ -453,7 +351,7 @@ inline ::std::string* Header::mutable_bodyhashvalue() {
return bodyhashvalue_;
}
// required string bodySignatureType = 7;
// required string bodysignaturetype = 7;
inline bool Header::has_bodysignaturetype() const {
return _has_bit(6);
}
......@@ -495,7 +393,7 @@ inline ::std::string* Header::mutable_bodysignaturetype() {
return bodysignaturetype_;
}
// required string bodySignature = 8;
// required string bodysignature = 8;
inline bool Header::has_bodysignature() const {
return _has_bit(7);
}
......@@ -537,68 +435,6 @@ inline ::std::string* Header::mutable_bodysignature() {
return bodysignature_;
}
// -------------------------------------------------------------------
// ReturnValue
// required uint32 returnValue = 1;
inline bool ReturnValue::has_returnvalue() const {
return _has_bit(0);
}
inline void ReturnValue::clear_returnvalue() {