Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
1bea9be4
Commit
1bea9be4
authored
Dec 05, 2008
by
Steven Murray
Browse files
Added place holder for send an acknowledge message to RTCPD.
parent
1637dd50
Changes
11
Hide whitespace changes
Inline
Side-by-side
castor/tape/aggregator/AggregatorDlfMessageConstants.hpp
View file @
1bea9be4
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Dec 5 1
5:05
:0
7
CET 2008
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Dec 5 1
6:58
:0
3
CET 2008
*/
/******************************************************************************
...
...
@@ -57,10 +57,9 @@ AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE=20, /* "Received error message from RTCP
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL
=
21
,
/* "The RtcpdHandlerThread has been passed a NULL socket pointer" */
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO
=
22
,
/* "Received a connection from RTCPD" */
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO
=
23
,
/* "Received a connection from RTCPD without peer host and port information" */
AGGREGATOR_GOT_VID_FROM_RTCPD
=
24
,
/* "Got VID from RTCPD" */
AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD
=
25
,
/* "Failed to VID from RTCPD" */
AGGREGATOR_RECEIVED_TAPE_REQUEST
=
26
,
/* "Received RTCP tape request message from RTCPD" */
AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY
=
27
/* "Failed to unmarshall message body" */
AGGREGATOR_RECEIVED_TAPE_REQUEST
=
24
,
/* "Received RTCP tape request message from RTCPD" */
AGGREGATOR_FAILED_TO_RECEIVE_TAPE_REQUEST
=
25
,
/* "Failed to receive RTCP tape request from RTCPD" */
AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY
=
26
/* "Failed to unmarshall message body" */
};
// enum AggregatorDlfMessages
}
// namespace aggregator
}
// namespace tape
...
...
castor/tape/aggregator/AggregatorDlfMessageStrings.cpp
View file @
1bea9be4
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Dec 5 1
5:05
:0
7
CET 2008
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Dec 5 1
6:58
:0
3
CET 2008
*/
/******************************************************************************
...
...
@@ -53,8 +53,7 @@ castor::dlf::Message castor::tape::aggregator::AggregatorDaemon::s_dlfMessages[]
{
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL
,
"The RtcpdHandlerThread has been passed a NULL socket pointer"
},
{
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO
,
"Received a connection from RTCPD"
},
{
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO
,
"Received a connection from RTCPD without peer host and port information"
},
{
AGGREGATOR_GOT_VID_FROM_RTCPD
,
"Got VID from RTCPD"
},
{
AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD
,
"Failed to VID from RTCPD"
},
{
AGGREGATOR_RECEIVED_TAPE_REQUEST
,
"Received RTCP tape request message from RTCPD"
},
{
AGGREGATOR_FAILED_TO_RECEIVE_TAPE_REQUEST
,
"Failed to receive RTCP tape request from RTCPD"
},
{
AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY
,
"Failed to unmarshall message body"
},
{
-
1
,
""
}};
castor/tape/aggregator/AggregatorDlfMessages.csv
View file @
1bea9be4
...
...
@@ -22,7 +22,6 @@ AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE,20,"Received error message from RTCPD"
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL,21,"The RtcpdHandlerThread has been passed a NULL socket pointer"
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO,22,"Received a connection from RTCPD"
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO,23,"Received a connection from RTCPD without peer host and port information"
AGGREGATOR_GOT_VID_FROM_RTCPD,24,"Got VID from RTCPD"
AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD,25,"Failed to VID from RTCPD"
AGGREGATOR_RECEIVED_TAPE_REQUEST,26,"Received RTCP tape request message from RTCPD"
AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY,27,"Failed to unmarshall message body"
AGGREGATOR_RECEIVED_TAPE_REQUEST,24,"Received RTCP tape request message from RTCPD"
AGGREGATOR_FAILED_TO_RECEIVE_TAPE_REQUEST,25,"Failed to receive RTCP tape request from RTCPD"
AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY,26,"Failed to unmarshall message body"
castor/tape/aggregator/Marshaller.cpp
View file @
1bea9be4
...
...
@@ -60,6 +60,7 @@ void castor::tape::aggregator::Marshaller::unmarshallUint8(const char * &src,
void
castor
::
tape
::
aggregator
::
Marshaller
::
marshallUint16
(
uint16_t
src
,
char
*
&
dst
)
throw
(
castor
::
exception
::
Exception
)
{
src
=
htons
(
src
);
marshall
(
src
,
dst
);
}
...
...
@@ -71,6 +72,7 @@ void castor::tape::aggregator::Marshaller::unmarshallUint16(const char * &src,
size_t
&
srcLen
,
uint16_t
&
dst
)
throw
(
castor
::
exception
::
Exception
)
{
unmarshall
(
src
,
srcLen
,
dst
);
dst
=
ntohs
(
dst
);
}
...
...
@@ -80,6 +82,7 @@ void castor::tape::aggregator::Marshaller::unmarshallUint16(const char * &src,
void
castor
::
tape
::
aggregator
::
Marshaller
::
marshallUint32
(
uint32_t
src
,
char
*
&
dst
)
throw
(
castor
::
exception
::
Exception
)
{
src
=
htonl
(
src
);
marshall
(
src
,
dst
);
}
...
...
@@ -91,6 +94,7 @@ void castor::tape::aggregator::Marshaller::unmarshallUint32(const char * &src,
size_t
&
srcLen
,
uint32_t
&
dst
)
throw
(
castor
::
exception
::
Exception
)
{
unmarshall
(
src
,
srcLen
,
dst
);
dst
=
ntohl
(
dst
);
}
...
...
castor/tape/aggregator/Marshaller.hpp
View file @
1bea9be4
...
...
@@ -46,6 +46,12 @@ namespace aggregator {
class
Marshaller
{
private:
/**
* Function template to help in marshalling simple data types. Please note
* that this function does not perform a host to network byte order
* conversion. If such a conversion is necessary, then it must be done
* before calling this function.
*/
template
<
class
T
>
static
void
marshall
(
T
src
,
char
*
&
dst
)
throw
(
castor
::
exception
::
Exception
)
{
...
...
@@ -56,13 +62,17 @@ namespace aggregator {
throw
ex
;
}
src
=
htonl
(
src
);
memcpy
(
dst
,
&
src
,
sizeof
(
src
));
dst
+=
sizeof
(
src
);
}
/**
* Function template to help in unmarshalling simple data types. Please
* note that this function does not perform a network to host byte order
* conversion. If such a conversion is necessary, then it must be done
* after this function has returned.
*/
template
<
class
T
>
static
void
unmarshall
(
const
char
*
&
src
,
size_t
&
srcLen
,
T
&
dst
)
throw
(
castor
::
exception
::
Exception
)
{
...
...
@@ -83,7 +93,6 @@ namespace aggregator {
}
memcpy
(
&
dst
,
src
,
sizeof
(
dst
));
dst
=
ntohl
(
dst
);
src
+=
sizeof
(
dst
);
srcLen
-=
sizeof
(
dst
);
...
...
castor/tape/aggregator/RcpJobReply.hpp
View file @
1bea9be4
...
...
@@ -41,7 +41,7 @@ namespace aggregator {
struct
RcpJobReply
{
uint32_t
status
;
char
errorMessage
[
1024
];
};
// struct RtcpTapeRequest
};
}
// namespace aggregator
}
// namespace tape
...
...
castor/tape/aggregator/RtcpdAcknowledge.hpp
0 → 100644
View file @
1bea9be4
/******************************************************************************
* castor/tape/aggregator/RtcpdAcknowledge.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#ifndef CASTOR_TAPE_AGGREGATOR_RTCPDACKNOWLEDGE
#define CASTOR_TAPE_AGGREGATOR_RTCPDACKNOWLEDGE
#include
"h/Castor_limits.h"
#include
<stdint.h>
#include
<string>
namespace
castor
{
namespace
tape
{
namespace
aggregator
{
/**
* An RTCPD acknowledge message.
*/
struct
RtcpdAcknowledge
{
uint32_t
magic
;
uint32_t
reqtype
;
uint32_t
status
;
};
}
// namespace aggregator
}
// namespace tape
}
// namespace castor
#endif // CASTOR_TAPE_AGGREGATOR_RTCPDACKNOWLEDGE
castor/tape/aggregator/RtcpdHandlerThread.cpp
View file @
1bea9be4
...
...
@@ -104,38 +104,47 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param)
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO
,
3
,
params
);
}
std
::
string
vid
;
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just before getVidFromRtcpd"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// Get the tape arequest associated with the remote copy job from RTCPD
RtcpTapeRequest
request
;
try
{
vid
=
getVid
FromRtcpd
(
cuuid
,
*
socket
,
NETRWTIMEOUT
);
getTapeRequest
FromRtcpd
(
cuuid
,
*
socket
,
NETRWTIMEOUT
,
request
);
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after getVidFromRtcpd"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"VID"
,
vid
)};
castor
::
dlf
::
Param
(
"vid"
,
request
.
vid
),
castor
::
dlf
::
Param
(
"vsn"
,
request
.
vsn
),
castor
::
dlf
::
Param
(
"label"
,
request
.
label
),
castor
::
dlf
::
Param
(
"devtype"
,
request
.
devtype
),
castor
::
dlf
::
Param
(
"density"
,
request
.
density
),
castor
::
dlf
::
Param
(
"unit"
,
request
.
unit
),
castor
::
dlf
::
Param
(
"VolReqID"
,
request
.
VolReqID
),
castor
::
dlf
::
Param
(
"jobID"
,
request
.
jobID
),
castor
::
dlf
::
Param
(
"mode"
,
request
.
mode
),
castor
::
dlf
::
Param
(
"start_file"
,
request
.
start_file
),
castor
::
dlf
::
Param
(
"end_file"
,
request
.
end_file
),
castor
::
dlf
::
Param
(
"side"
,
request
.
side
),
castor
::
dlf
::
Param
(
"tprc"
,
request
.
tprc
),
castor
::
dlf
::
Param
(
"TStartRequest"
,
request
.
TStartRequest
),
castor
::
dlf
::
Param
(
"TEndRequest"
,
request
.
TEndRequest
),
castor
::
dlf
::
Param
(
"TStartRtcpd"
,
request
.
TStartRtcpd
),
castor
::
dlf
::
Param
(
"TStartMount"
,
request
.
TStartMount
),
castor
::
dlf
::
Param
(
"TEndMount"
,
request
.
TEndMount
),
castor
::
dlf
::
Param
(
"TStartUnmount"
,
request
.
TStartUnmount
),
castor
::
dlf
::
Param
(
"TEndUnmount"
,
request
.
TEndUnmount
),
castor
::
dlf
::
Param
(
"rtcpReqId"
,
request
.
rtcpReqId
),
castor
::
dlf
::
Param
(
"err.errmsgtxt"
,
request
.
err
.
errmsgtxt
),
castor
::
dlf
::
Param
(
"err.severity"
,
request
.
err
.
severity
),
castor
::
dlf
::
Param
(
"err.errorcode"
,
request
.
err
.
errorcode
),
castor
::
dlf
::
Param
(
"err.max_tpretry"
,
request
.
err
.
max_tpretry
),
castor
::
dlf
::
Param
(
"err.max_cpretry"
,
request
.
err
.
max_cpretry
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_
GOT_VID_FROM_RTCPD
,
1
,
params
);
AGGREGATOR_
RECEIVED_TAPE_REQUEST
,
26
,
params
);
}
catch
(
castor
::
exception
::
Exception
&
ex
)
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
ex
.
getMessage
().
str
()),
castor
::
dlf
::
Param
(
"Code"
,
ex
.
code
())};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_ERROR
,
AGGREGATOR_FAILED_TO_
GET_VID_FROM_RTCPD
,
3
,
params
);
AGGREGATOR_FAILED_TO_
RECEIVE_TAPE_REQUEST
,
3
,
params
);
}
// Close and de-allocate the socket
...
...
@@ -153,25 +162,17 @@ void castor::tape::aggregator::RtcpdHandlerThread::stop()
//-----------------------------------------------------------------------------
// get
Vid
FromRtcpd
// get
TapeRequest
FromRtcpd
//-----------------------------------------------------------------------------
std
::
string
castor
::
tape
::
aggregator
::
RtcpdHandlerThread
::
getVidFromRtcpd
(
const
Cuuid_t
&
cuuid
,
castor
::
io
::
AbstractTCPSocket
&
socket
,
const
int
netReadWriteTimeout
)
throw
(
castor
::
exception
::
Exception
)
{
void
castor
::
tape
::
aggregator
::
RtcpdHandlerThread
::
getTapeRequestFromRtcpd
(
const
Cuuid_t
&
cuuid
,
castor
::
io
::
AbstractTCPSocket
&
socket
,
const
int
netReadWriteTimeout
,
RtcpTapeRequest
&
request
)
throw
(
castor
::
exception
::
Exception
)
{
char
buf
[
MSGBUFSIZ
];
RtcpTapeRequest
request
;
memset
(
&
request
,
'\0'
,
sizeof
(
request
));
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after memset(&request, '
\\
0', sizeof(request));"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// Marshall the request for VID message
size_t
totalLen
=
0
;
try
{
...
...
@@ -186,14 +187,6 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
throw
ie
;
}
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after marshall of request for VID message"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// Send the request for a VID to RTCPD
try
{
SocketHelper
::
writeBytes
(
socket
,
netReadWriteTimeout
,
totalLen
,
buf
);
...
...
@@ -205,18 +198,10 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
<<
ex
.
getMessage
().
str
();
}
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after send of request for a VID to RTCPD"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// Receive acknowledge from RTCPD
uint32_t
status
=
0
;
RtcpdAcknowledge
ackMsg
;
try
{
status
=
receiveRtcpdAcknowledge
(
cuuid
,
socket
,
netReadWriteTimeout
);
receiveRtcpdAcknowledge
(
cuuid
,
socket
,
netReadWriteTimeout
,
ackMsg
);
}
catch
(
castor
::
exception
::
Exception
&
ex
)
{
castor
::
exception
::
Exception
ex2
(
EPROTO
);
...
...
@@ -226,12 +211,37 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
throw
ex2
;
}
// If the magic number is invalid
if
(
ackMsg
.
magic
!=
RTCOPY_MAGIC
)
{
castor
::
exception
::
Exception
ex
(
EINVAL
);
ex
.
getMessage
()
<<
__PRETTY_FUNCTION__
<<
": Invalid magic number from RTCPD"
<<
": Expected: 0x"
<<
std
::
hex
<<
RTCOPY_MAGIC
<<
": Received: "
<<
ackMsg
.
magic
;
throw
ex
;
}
// If the request type is invalid
if
(
ackMsg
.
reqtype
!=
RTCP_TAPEERR_REQ
)
{
castor
::
exception
::
Exception
ex
(
EINVAL
);
ex
.
getMessage
()
<<
__PRETTY_FUNCTION__
<<
": Invalid request type from RTCPD"
<<
": Expected: 0x"
<<
std
::
hex
<<
RTCP_TAPEERR_REQ
<<
": Received: "
<<
ackMsg
.
reqtype
;
throw
ex
;
}
// If the acknowledge is negative
if
(
status
!=
0
)
{
if
(
ackMsg
.
status
!=
0
)
{
castor
::
exception
::
Exception
ex
(
ECANCELED
);
ex
.
getMessage
()
<<
__PRETTY_FUNCTION__
<<
": Received negative acknowledge from RTCPD"
;
<<
": Received negative acknowledge from RTCPD"
": Status: "
<<
ackMsg
.
status
;
throw
ex
;
}
...
...
@@ -248,38 +258,6 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
throw
ex2
;
}
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"vid"
,
request
.
vid
),
castor
::
dlf
::
Param
(
"vsn"
,
request
.
vsn
),
castor
::
dlf
::
Param
(
"label"
,
request
.
label
),
castor
::
dlf
::
Param
(
"devtype"
,
request
.
devtype
),
castor
::
dlf
::
Param
(
"density"
,
request
.
density
),
castor
::
dlf
::
Param
(
"unit"
,
request
.
unit
),
castor
::
dlf
::
Param
(
"VolReqID"
,
request
.
VolReqID
),
castor
::
dlf
::
Param
(
"jobID"
,
request
.
jobID
),
castor
::
dlf
::
Param
(
"mode"
,
request
.
mode
),
castor
::
dlf
::
Param
(
"start_file"
,
request
.
start_file
),
castor
::
dlf
::
Param
(
"end_file"
,
request
.
end_file
),
castor
::
dlf
::
Param
(
"side"
,
request
.
side
),
castor
::
dlf
::
Param
(
"tprc"
,
request
.
tprc
),
castor
::
dlf
::
Param
(
"TStartRequest"
,
request
.
TStartRequest
),
castor
::
dlf
::
Param
(
"TEndRequest"
,
request
.
TEndRequest
),
castor
::
dlf
::
Param
(
"TStartRtcpd"
,
request
.
TStartRtcpd
),
castor
::
dlf
::
Param
(
"TStartMount"
,
request
.
TStartMount
),
castor
::
dlf
::
Param
(
"TEndMount"
,
request
.
TEndMount
),
castor
::
dlf
::
Param
(
"TStartUnmount"
,
request
.
TStartUnmount
),
castor
::
dlf
::
Param
(
"TEndUnmount"
,
request
.
TEndUnmount
),
castor
::
dlf
::
Param
(
"rtcpReqId"
,
request
.
rtcpReqId
),
castor
::
dlf
::
Param
(
"err.errmsgtxt"
,
request
.
err
.
errmsgtxt
),
castor
::
dlf
::
Param
(
"err.severity"
,
request
.
err
.
severity
),
castor
::
dlf
::
Param
(
"err.errorcode"
,
request
.
err
.
errorcode
),
castor
::
dlf
::
Param
(
"err.max_tpretry"
,
request
.
err
.
max_tpretry
),
castor
::
dlf
::
Param
(
"err.max_cpretry"
,
request
.
err
.
max_cpretry
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_RECEIVED_TAPE_REQUEST
,
26
,
params
);
}
// Send acknowledge to RTCP
/*
TO BE DONE!!!!!
...
...
@@ -294,31 +272,20 @@ TO BE DONE!!!!!
throw ex;
}
*/
// If there is no volume request ID
if
(
request
.
VolReqID
<=
0
)
{
castor
::
exception
::
Exception
ex
(
EINVAL
);
ex
.
getMessage
()
<<
__PRETTY_FUNCTION__
<<
": Failed to get VID from RTCPD: No volume request ID"
;
throw
ex
;
}
return
request
.
vid
;
}
//-----------------------------------------------------------------------------
// receiveRtcpdAcknowledge
//-----------------------------------------------------------------------------
uint32_t
castor
::
tape
::
aggregator
::
RtcpdHandlerThread
::
receiveRtcpdAcknowledge
(
void
castor
::
tape
::
aggregator
::
RtcpdHandlerThread
::
receiveRtcpdAcknowledge
(
const
Cuuid_t
&
cuuid
,
castor
::
io
::
AbstractTCPSocket
&
socket
,
const
int
netReadWriteTimeout
)
throw
(
castor
::
exception
::
Exception
)
{
const
int
netReadWriteTimeout
,
RtcpdAcknowledge
&
message
)
throw
(
castor
::
exception
::
Exception
)
{
// Read and unmarshall the magic number
uint32_t
magic
=
0
;
try
{
magic
=
SocketHelper
::
readUint32
(
socket
,
NETRWTIMEOUT
);
message
.
magic
=
SocketHelper
::
readUint32
(
socket
,
NETRWTIMEOUT
);
}
catch
(
castor
::
exception
::
Exception
&
ex
)
{
castor
::
exception
::
Internal
ie
;
...
...
@@ -329,31 +296,9 @@ uint32_t castor::tape::aggregator::RtcpdHandlerThread::receiveRtcpdAcknowledge(
throw
ie
;
}
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after read and unmarshall the magic number"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// If the magic number is invalid
if
(
magic
!=
RTCOPY_MAGIC
)
{
castor
::
exception
::
Exception
ex
(
EINVAL
);
ex
.
getMessage
()
<<
__PRETTY_FUNCTION__
<<
": Invalid magic number from RTCPD"
<<
": Expected: 0x"
<<
std
::
hex
<<
RTCOPY_MAGIC
<<
": Received: "
<<
magic
;
throw
ex
;
}
// Read and unmarshall the request type
uint32_t
reqtype
=
0
;
try
{
reqtype
=
SocketHelper
::
readUint32
(
socket
,
NETRWTIMEOUT
);
message
.
reqtype
=
SocketHelper
::
readUint32
(
socket
,
NETRWTIMEOUT
);
}
catch
(
castor
::
exception
::
Exception
&
ex
)
{
castor
::
exception
::
Internal
ie
;
...
...
@@ -364,31 +309,9 @@ uint32_t castor::tape::aggregator::RtcpdHandlerThread::receiveRtcpdAcknowledge(
throw
ie
;
}
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after read and unmarshall the magic number"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// If the request type is invalid
if
(
reqtype
!=
RTCP_TAPEERR_REQ
)
{
castor
::
exception
::
Exception
ex
(
EINVAL
);
ex
.
getMessage
()
<<
__PRETTY_FUNCTION__
<<
": Invalid request type from RTCPD"
<<
": Expected: 0x"
<<
std
::
hex
<<
RTCP_TAPEERR_REQ
<<
": Received: "
<<
reqtype
;
throw
ex
;
}
// Read and unmarshall the status
uint32_t
status
=
0
;
try
{
status
=
SocketHelper
::
readUint32
(
socket
,
NETRWTIMEOUT
);
message
.
status
=
SocketHelper
::
readUint32
(
socket
,
NETRWTIMEOUT
);
}
catch
(
castor
::
exception
::
Exception
&
ex
)
{
castor
::
exception
::
Internal
ie
;
...
...
@@ -398,8 +321,22 @@ uint32_t castor::tape::aggregator::RtcpdHandlerThread::receiveRtcpdAcknowledge(
throw
ie
;
}
}
//-----------------------------------------------------------------------------
// sendRtcpdAcknowledge
//-----------------------------------------------------------------------------
void
castor
::
tape
::
aggregator
::
RtcpdHandlerThread
::
sendRtcpdAcknowledge
(
const
Cuuid_t
&
cuuid
,
castor
::
io
::
AbstractTCPSocket
&
socket
,
const
int
netReadWriteTimeout
,
const
RtcpdAcknowledge
&
message
)
throw
(
castor
::
exception
::
Exception
)
{
castor
::
exception
::
Internal
ie
;
ie
.
getMessage
()
<<
"Not implemented"
;
return
status
;
throw
ie
;
}
...
...
@@ -425,15 +362,6 @@ void castor::tape::aggregator::RtcpdHandlerThread::receiveRtcpTapeRequest(
throw
ex2
;
}
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after read and unmarshall the magic number"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// If the magic number is invalid
if
(
magic
!=
RTCOPY_MAGIC
)
{
castor
::
exception
::
Exception
ex
(
EBADMSG
);
...
...
@@ -461,15 +389,6 @@ void castor::tape::aggregator::RtcpdHandlerThread::receiveRtcpTapeRequest(
throw
ex2
;
}
{
castor
::
dlf
::
Param
params
[]
=
{
castor
::
dlf
::
Param
(
"Function"
,
__PRETTY_FUNCTION__
),
castor
::
dlf
::
Param
(
"Message"
,
"Just after read and unmarshall the magic number"
)};
castor
::
dlf
::
dlf_writep
(
cuuid
,
DLF_LVL_SYSTEM
,
AGGREGATOR_NULL
,
2
,
params
);
}
// If the request type is invalid
if
(
reqtype
!=
RTCP_TAPEERR_REQ
)
{
castor
::
exception
::
Exception
ex
(
EBADMSG
);
...
...
castor/tape/aggregator/RtcpdHandlerThread.hpp
View file @
1bea9be4
...
...
@@ -28,10 +28,9 @@
#include
"castor/io/ServerSocket.hpp"
#include
"castor/server/IThread.hpp"
#include
"castor/server/Queue.hpp"
#include
"castor/tape/aggregator/RtcpdAcknowledge.hpp"
#include
"castor/tape/aggregator/RtcpTapeRequest.hpp"
#include
<map>
namespace
castor
{
namespace
tape
{
...
...
@@ -72,15 +71,16 @@ namespace aggregator {
private:
/**
* Get
the VID associated with the remote copy job
from RTCPD.
* Get
a tape request
from RTCPD.
*
* @param cuuid The ccuid to be used for logging.
* @param socket The socket of the connection with RTCPD.
* @return The VID.
* @param request The request structure to filled with the request from
* RTCPD
*/
std
::
string
getVid
FromRtcpd
(
const
Cuuid_t
&
cuuid
,
castor
::
io
::
AbstractTCPSocket
&
socket
,
const
int
netReadWriteTimeout
)
throw
(
castor
::
exception
::
Exception
);
void
getTapeRequest
FromRtcpd
(
const
Cuuid_t
&
cuuid
,
castor
::
io
::
AbstractTCPSocket
&
socket
,
const
int
netReadWriteTimeout
,
RtcpTapeRequest
&
request
)
throw
(
castor
::
exception
::
Exception
);
/**
* Receives an acknowledge message from RTCPD and returns the status code
...
...
@@ -88,11 +88,20 @@ namespace aggregator {
*
* @param cuuid The ccuid to be used for logging.
* @param socket The socket of the connection with RTCPD.
* @return The status code embedded within the message.
* @param message The message structure to be filled with the acknowledge
* message.
*/
void
receiveRtcpdAcknowledge
(
const
Cuuid_t
&
cuuid
,
castor
::
io
::
AbstractTCPSocket
&
socket
,
const
int
netReadWriteTimeout
,
RtcpdAcknowledge
&
message
)
throw
(
castor
::
exception
::
Exception
);
/**
* Sends the specified RTCPD acknowledge message to RTCPD using the