Commit a8531be0 authored by Steven Murray's avatar Steven Murray
Browse files

messages::Exception is now sent from tapeserverd to its clients

parent c8289901
......@@ -30,13 +30,14 @@ enum ProtocolType {
};
enum MsgType {
MSG_TYPE_RETURNVALUE,
MSG_TYPE_EXCEPTION,
MSG_TYPE_HEARTBEAT,
MSG_TYPE_MIGRATIONJOBFROMTAPEGATEWAY,
MSG_TYPE_MIGRATIONJOBFROMWRITETP,
MSG_TYPE_NBFILESONTAPE,
MSG_TYPE_RECALLJOBFROMREADTP,
MSG_TYPE_RECALLJOBFROMTAPEGATEWAY,
MSG_TYPE_RETURNVALUE,
MSG_TYPE_TAPEMOUNTEDFORMIGRATION,
MSG_TYPE_TAPEMOUNTEDFORRECALL,
MSG_TYPE_TAPEUNMOUNTSTARTED,
......
/******************************************************************************
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/messages/ReplyContainer.hpp"
namespace castor {
namespace messages {
typedef ReplyContainer<TPMAGIC, PROTOCOL_TYPE_TAPE, PROTOCOL_VERSION_1>
ProtoTapeReplyContainer;
} // namespace messages
} // namespace castor
/******************************************************************************
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/messages/Constants.hpp"
#include "castor/messages/Header.pb.h"
#include "castor/messages/messages.hpp"
#include "castor/messages/ReturnValue.pb.h"
#include "castor/messages/ZmqMsg.hpp"
#include "castor/messages/ZmqSocket.hpp"
#include "castor/exception/Exception.hpp"
#include "h/Ctape.h"
namespace castor {
namespace messages {
/**
* This struct is used to read an answer (=header+body)
* over the socket given in the constructor. It will also performs several checks
* and deal with the body if it is a ReturnValue
*It is templated in order to be able to use it with different
* protocolType and protocolVersion
*/
template <int magic, int protocolType, int protocolVersion>
struct ReplyContainer {
castor::messages::Header header;
ZmqMsg blobBody;
template<class T> ReplyContainer(T& socket) {
ZmqMsg blobHeader;
socket.recv(&blobHeader.getZmqMsg());
if(!zmq_msg_more(&blobHeader.getZmqMsg())) {
throw castor::exception::Exception("Expecting a multi part message. Got a header without a body");
}
socket.recv(&blobBody.getZmqMsg());
if(!header.ParseFromArray(blobHeader.getData(),blobHeader.size())){
throw castor::exception::Exception("Message header cant be parsed from binary data read");
}
//check magic, protocolTypa and protocolVerison
if(header.magic() != magic){
throw castor::exception::Exception("Wrong magic number in the header");
}
if(header.protocoltype() != protocolType){
throw castor::exception::Exception("Wrong protocol type in the header");
}
if(header.protocolversion() != protocolVersion){
throw castor::exception::Exception("Wrong protocol version in the header");
}
//check if everything is ok in a answer where we are not expecting any real va;ue
if(header.msgtype() == MSG_TYPE_RETURNVALUE){
castor::messages::ReturnValue body;
// TO BE DONE - SHOUDL DO WITH REPLACMENT FRAME CLASS
//checkSHA1(header,blobBody);
if(!body.ParseFromArray(blobBody.getData(),blobBody.size())){
throw castor::exception::Exception("Expecting a ReturnValue body but"
" cant parse it from the binary");
}
if(body.returnvalue()!=0){
throw castor::exception::Exception(body.message());
}
}
}
private :
ReplyContainer(const ReplyContainer&);
ReplyContainer& operator=(const ReplyContainer&);
}; // struct ReplyContainer
} // namespace messages
} // namespace castor
......@@ -29,9 +29,8 @@ void protobuf_AssignDesc_ReturnValue_2eproto() {
"ReturnValue.proto");
GOOGLE_CHECK(file != NULL);
ReturnValue_descriptor_ = file->message_type(0);
static const int ReturnValue_offsets_[2] = {
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(ReturnValue, returnvalue_),
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(ReturnValue, message_),
static const int ReturnValue_offsets_[1] = {
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(ReturnValue, value_),
};
ReturnValue_reflection_ =
new ::google::protobuf::internal::GeneratedMessageReflection(
......@@ -74,9 +73,8 @@ void protobuf_AddDesc_ReturnValue_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
"\n\021ReturnValue.proto\022\017castor.messages\"3\n\013"
"ReturnValue\022\023\n\013returnValue\030\001 \002(\r\022\017\n\007mess"
"age\030\002 \002(\t", 89);
"\n\021ReturnValue.proto\022\017castor.messages\"\034\n\013"
"ReturnValue\022\r\n\005value\030\001 \002(\r", 66);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"ReturnValue.proto", &protobuf_RegisterTypes);
ReturnValue::default_instance_ = new ReturnValue();
......@@ -94,10 +92,8 @@ struct StaticDescriptorInitializer_ReturnValue_2eproto {
// ===================================================================
const ::std::string ReturnValue::_default_message_;
#ifndef _MSC_VER
const int ReturnValue::kReturnValueFieldNumber;
const int ReturnValue::kMessageFieldNumber;
const int ReturnValue::kValueFieldNumber;
#endif // !_MSC_VER
ReturnValue::ReturnValue()
......@@ -116,8 +112,7 @@ ReturnValue::ReturnValue(const ReturnValue& from)
void ReturnValue::SharedCtor() {
_cached_size_ = 0;
returnvalue_ = 0u;
message_ = const_cast< ::std::string*>(&_default_message_);
value_ = 0u;
::memset(_has_bits_, 0, sizeof(_has_bits_));
}
......@@ -126,9 +121,6 @@ ReturnValue::~ReturnValue() {
}
void ReturnValue::SharedDtor() {
if (message_ != &_default_message_) {
delete message_;
}
if (this != default_instance_) {
}
}
......@@ -155,12 +147,7 @@ ReturnValue* ReturnValue::New() const {
void ReturnValue::Clear() {
if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
returnvalue_ = 0u;
if (_has_bit(1)) {
if (message_ != &_default_message_) {
message_->clear();
}
}
value_ = 0u;
}
::memset(_has_bits_, 0, sizeof(_has_bits_));
mutable_unknown_fields()->Clear();
......@@ -172,34 +159,17 @@ bool ReturnValue::MergePartialFromCodedStream(
::google::protobuf::uint32 tag;
while ((tag = input->ReadTag()) != 0) {
switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) {
// required uint32 returnValue = 1;
// required uint32 value = 1;
case 1: {
if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
::google::protobuf::uint32, ::google::protobuf::internal::WireFormatLite::TYPE_UINT32>(
input, &returnvalue_)));
input, &value_)));
_set_bit(0);
} else {
goto handle_uninterpreted;
}
if (input->ExpectTag(18)) goto parse_message;
break;
}
// required string message = 2;
case 2: {
if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
parse_message:
DO_(::google::protobuf::internal::WireFormatLite::ReadString(
input, this->mutable_message()));
::google::protobuf::internal::WireFormat::VerifyUTF8String(
this->message().data(), this->message().length(),
::google::protobuf::internal::WireFormat::PARSE);
} else {
goto handle_uninterpreted;
}
if (input->ExpectAtEnd()) return true;
break;
}
......@@ -222,18 +192,9 @@ bool ReturnValue::MergePartialFromCodedStream(
void ReturnValue::SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const {
// required uint32 returnValue = 1;
// required uint32 value = 1;
if (_has_bit(0)) {
::google::protobuf::internal::WireFormatLite::WriteUInt32(1, this->returnvalue(), output);
}
// required string message = 2;
if (_has_bit(1)) {
::google::protobuf::internal::WireFormat::VerifyUTF8String(
this->message().data(), this->message().length(),
::google::protobuf::internal::WireFormat::SERIALIZE);
::google::protobuf::internal::WireFormatLite::WriteString(
2, this->message(), output);
::google::protobuf::internal::WireFormatLite::WriteUInt32(1, this->value(), output);
}
if (!unknown_fields().empty()) {
......@@ -244,19 +205,9 @@ void ReturnValue::SerializeWithCachedSizes(
::google::protobuf::uint8* ReturnValue::SerializeWithCachedSizesToArray(
::google::protobuf::uint8* target) const {
// required uint32 returnValue = 1;
// required uint32 value = 1;
if (_has_bit(0)) {
target = ::google::protobuf::internal::WireFormatLite::WriteUInt32ToArray(1, this->returnvalue(), target);
}
// required string message = 2;
if (_has_bit(1)) {
::google::protobuf::internal::WireFormat::VerifyUTF8String(
this->message().data(), this->message().length(),
::google::protobuf::internal::WireFormat::SERIALIZE);
target =
::google::protobuf::internal::WireFormatLite::WriteStringToArray(
2, this->message(), target);
target = ::google::protobuf::internal::WireFormatLite::WriteUInt32ToArray(1, this->value(), target);
}
if (!unknown_fields().empty()) {
......@@ -270,18 +221,11 @@ int ReturnValue::ByteSize() const {
int total_size = 0;
if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
// required uint32 returnValue = 1;
if (has_returnvalue()) {
// required uint32 value = 1;
if (has_value()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::UInt32Size(
this->returnvalue());
}
// required string message = 2;
if (has_message()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::StringSize(
this->message());
this->value());
}
}
......@@ -312,10 +256,7 @@ void ReturnValue::MergeFrom(const ReturnValue& from) {
GOOGLE_CHECK_NE(&from, this);
if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) {
if (from._has_bit(0)) {
set_returnvalue(from.returnvalue());
}
if (from._has_bit(1)) {
set_message(from.message());
set_value(from.value());
}
}
mutable_unknown_fields()->MergeFrom(from.unknown_fields());
......@@ -334,15 +275,14 @@ void ReturnValue::CopyFrom(const ReturnValue& from) {
}
bool ReturnValue::IsInitialized() const {
if ((_has_bits_[0] & 0x00000003) != 0x00000003) return false;
if ((_has_bits_[0] & 0x00000001) != 0x00000001) return false;
return true;
}
void ReturnValue::Swap(ReturnValue* other) {
if (other != this) {
std::swap(returnvalue_, other->returnvalue_);
std::swap(message_, other->message_);
std::swap(value_, other->value_);
std::swap(_has_bits_[0], other->_has_bits_[0]);
_unknown_fields_.Swap(&other->_unknown_fields_);
std::swap(_cached_size_, other->_cached_size_);
......
......@@ -91,36 +91,24 @@ class ReturnValue : public ::google::protobuf::Message {
// accessors -------------------------------------------------------
// required uint32 returnValue = 1;
inline bool has_returnvalue() const;
inline void clear_returnvalue();
static const int kReturnValueFieldNumber = 1;
inline ::google::protobuf::uint32 returnvalue() const;
inline void set_returnvalue(::google::protobuf::uint32 value);
// required string message = 2;
inline bool has_message() const;
inline void clear_message();
static const int kMessageFieldNumber = 2;
inline const ::std::string& message() const;
inline void set_message(const ::std::string& value);
inline void set_message(const char* value);
inline void set_message(const char* value, size_t size);
inline ::std::string* mutable_message();
// required uint32 value = 1;
inline bool has_value() const;
inline void clear_value();
static const int kValueFieldNumber = 1;
inline ::google::protobuf::uint32 value() const;
inline void set_value(::google::protobuf::uint32 value);
// @@protoc_insertion_point(class_scope:castor.messages.ReturnValue)
private:
::google::protobuf::UnknownFieldSet _unknown_fields_;
mutable int _cached_size_;
::google::protobuf::uint32 returnvalue_;
::std::string* message_;
static const ::std::string _default_message_;
::google::protobuf::uint32 value_;
friend void protobuf_AddDesc_ReturnValue_2eproto();
friend void protobuf_AssignDesc_ReturnValue_2eproto();
friend void protobuf_ShutdownFile_ReturnValue_2eproto();
::google::protobuf::uint32 _has_bits_[(2 + 31) / 32];
::google::protobuf::uint32 _has_bits_[(1 + 31) / 32];
// WHY DOES & HAVE LOWER PRECEDENCE THAN != !?
inline bool _has_bit(int index) const {
......@@ -143,62 +131,20 @@ class ReturnValue : public ::google::protobuf::Message {
// ReturnValue
// required uint32 returnValue = 1;
inline bool ReturnValue::has_returnvalue() const {
// required uint32 value = 1;
inline bool ReturnValue::has_value() const {
return _has_bit(0);
}
inline void ReturnValue::clear_returnvalue() {
returnvalue_ = 0u;
inline void ReturnValue::clear_value() {
value_ = 0u;
_clear_bit(0);
}
inline ::google::protobuf::uint32 ReturnValue::returnvalue() const {
return returnvalue_;
inline ::google::protobuf::uint32 ReturnValue::value() const {
return value_;
}
inline void ReturnValue::set_returnvalue(::google::protobuf::uint32 value) {
inline void ReturnValue::set_value(::google::protobuf::uint32 value) {
_set_bit(0);
returnvalue_ = value;
}
// required string message = 2;
inline bool ReturnValue::has_message() const {
return _has_bit(1);
}
inline void ReturnValue::clear_message() {
if (message_ != &_default_message_) {
message_->clear();
}
_clear_bit(1);
}
inline const ::std::string& ReturnValue::message() const {
return *message_;
}
inline void ReturnValue::set_message(const ::std::string& value) {
_set_bit(1);
if (message_ == &_default_message_) {
message_ = new ::std::string;
}
message_->assign(value);
}
inline void ReturnValue::set_message(const char* value) {
_set_bit(1);
if (message_ == &_default_message_) {
message_ = new ::std::string;
}
message_->assign(value);
}
inline void ReturnValue::set_message(const char* value, size_t size) {
_set_bit(1);
if (message_ == &_default_message_) {
message_ = new ::std::string;
}
message_->assign(reinterpret_cast<const char*>(value), size);
}
inline ::std::string* ReturnValue::mutable_message() {
_set_bit(1);
if (message_ == &_default_message_) {
message_ = new ::std::string;
}
return message_;
value_ = value;
}
......
......@@ -18,10 +18,6 @@
package castor.messages;
// Generic return value. It is either returnValue == 0 and message == "" if
// everything is OK or returnValue != 0 and message != "" if something went
// wrong.
message ReturnValue {
required uint32 returnValue = 1;
required string message = 2;
required uint32 value = 1;
}
......@@ -27,9 +27,9 @@
#include "castor/messages/MigrationJobFromTapeGateway.pb.h"
#include "castor/messages/MigrationJobFromWriteTp.pb.h"
#include "castor/messages/NbFilesOnTape.pb.h"
#include "castor/messages/ProtoTapeReplyContainer.hpp"
#include "castor/messages/RecallJobFromReadTp.pb.h"
#include "castor/messages/RecallJobFromTapeGateway.pb.h"
#include "castor/messages/ReturnValue.pb.h"
#include "castor/messages/TapeMountedForRecall.pb.h"
#include "castor/messages/TapeMountedForMigration.pb.h"
#include "castor/messages/TapeserverProxyZmq.hpp"
......@@ -39,9 +39,9 @@
#include "castor/tape/tapegateway/VolumeMode.hpp"
#include "castor/utils/SmartFd.hpp"
#include "castor/utils/utils.hpp"
#include "h/Ctape.h"
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
#include "h/Ctape.h"
//------------------------------------------------------------------------------
// constructor
......@@ -66,7 +66,15 @@ void castor::messages::TapeserverProxyZmq::gotRecallJobFromTapeGateway(
const Frame rqst = createRecallJobFromTapeGatewayFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
messages::ProtoTapeReplyContainer reply(m_tapeserverSocket);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Received an unexpected return value"
": expected=0 actual=" << reply.value();
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
......@@ -74,20 +82,6 @@ void castor::messages::TapeserverProxyZmq::gotRecallJobFromTapeGateway(
"vid=" << vid << " unitName=" << unitName << ": " <<
ne.getMessage().str();
throw ex;
} catch(std::exception &se) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of recall job from tapegateway: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
se.what();
throw ex;
} catch(...) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of recall job from tapegateway: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
"Caught an unknown exception";
throw ex;
}
}
......@@ -128,8 +122,15 @@ void castor::messages::TapeserverProxyZmq::gotRecallJobFromReadTp(
const Frame rqst = createRecallJobFromReadTpFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
messages::ProtoTapeReplyContainer reply(m_tapeserverSocket);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Received an unexpected return value"
": expected=0 actual=" << reply.value();
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
......@@ -137,20 +138,6 @@ void castor::messages::TapeserverProxyZmq::gotRecallJobFromReadTp(
"vid=" << vid << " unitName=" << unitName << ": " <<
ne.getMessage().str();
throw ex;
} catch(std::exception &se) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of recall job from readtp: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
se.what();
throw ex;
} catch(...) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of recall job from readtp: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
"Caught an unknown exception";
throw ex;
}
}
......@@ -190,23 +177,9 @@ uint32_t castor::messages::TapeserverProxyZmq::gotMigrationJobFromTapeGateway(
const Frame rqst = createMigrationJobFromTapeGatewayFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
messages::ProtoTapeReplyContainer rawReply(m_tapeserverSocket);
if(rawReply.header.msgtype() != MSG_TYPE_NBFILESONTAPE) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to receive reply from tapeserverd"
": Unexpected message type: expected=" << MSG_TYPE_NBFILESONTAPE
<< " actual=" << rawReply.header.msgtype();
throw ex;
}
messages::NbFilesOnTape reply;
if(!reply.ParseFromArray(rawReply.blobBody.getData(),
rawReply.blobBody.size())) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to parse reply from tapeserverd"
": msgType=" << rawReply.header.msgtype();
throw ex;
}
return reply.nbfiles();
NbFilesOnTape reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
return reply.nbfiles();
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
......@@ -214,20 +187,6 @@ uint32_t castor::messages::TapeserverProxyZmq::gotMigrationJobFromTapeGateway(
"vid=" << vid << " unitName=" << unitName << ": " <<
ne.getMessage().str();
throw ex;
} catch(std::exception &se) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of migration job from tapegateway: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
se.what();