Skip to content
Snippets Groups Projects
Commit ea08b486 authored by Michael Davis's avatar Michael Davis
Browse files

[ssi_af_ls] Adds DataCallback to process data/stream responses

parent 3111ee38
Branches
Tags
No related merge requests found
......@@ -35,7 +35,6 @@ namespace XrdSsiPb {
* number of Alert messages to be sent before each Response. These can be used for any purpose defined
* by the client and server, for example writing messages to the client log.
*/
template<typename CallbackArg>
class RequestCallback
{
......@@ -48,7 +47,6 @@ public:
/*!
* Request class
*/
template <typename RequestType, typename MetadataType, typename AlertType>
class Request : public XrdSsiRequest
{
......@@ -77,7 +75,7 @@ public:
virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo) override;
virtual XrdSsiRequest::PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int blen, bool last) override;
virtual XrdSsiRequest::PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last) override;
virtual void Alert(XrdSsiRespInfoMsg &alert_msg) override;
......@@ -89,12 +87,28 @@ public:
private:
void ProcessResponseMetadata();
std::string m_request_str; //!< Request buffer
char *m_response_bufptr; //!< Pointer to the Response buffer
int m_response_bufsize; //!< Size of the Response buffer
std::promise<MetadataType> m_promise; //!< Promise a reply of Metadata type
std::string m_request_str; //!< Request buffer
char *m_response_bufptr; //!< Pointer to the Response buffer
int m_response_bufsize; //!< Size of the Response buffer
std::promise<MetadataType> m_promise; //!< Promise a reply of Metadata type
RequestCallback<AlertType> AlertCallback; //!< Callback for Alert messages
RequestCallback<AlertType> AlertCallback; //!< Callback for Alert messages
/*!
* Callback for Data Responses
*
* This method needs to be specialised for any client which can accept Data or Stream payloads.
*
* @param[in,out] post_process Indicate what type of post-processing should be done by the XrdSsi framework:
* XrdSsiRequest::PRD_Normal (default): normal post-processing
* XrdSsiRequest::PRD_Hold: client is resource-limited and can't handle the queue at this time
* @param[in] response_bufptr Pointer to newly-arrived data buffer
* @param[in] response_buflen Length of response_bufptr
* @param[in] is_last Flag indicating whether this is the last buffer in the Stream/Data Response
*/
void DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen, bool is_last) {
throw XrdSsiException("Data/Stream Responses are not implemented.");
}
};
......@@ -102,7 +116,6 @@ private:
/*!
* Request constructor
*/
template<typename RequestType, typename MetadataType, typename AlertType>
Request<RequestType, MetadataType, AlertType>::
Request(const RequestType &request, unsigned int response_bufsize, uint16_t timeout) :
......@@ -114,11 +127,9 @@ Request(const RequestType &request, unsigned int response_bufsize, uint16_t time
<< " bytes, timeout = " << timeout << std::endl;
#endif
// Set response timeout
SetTimeOut(timeout);
// Serialize the Request
if(!request.SerializeToString(&m_request_str))
{
throw PbException("request.SerializeToString() failed");
......@@ -133,7 +144,6 @@ Request(const RequestType &request, unsigned int response_bufsize, uint16_t time
* Requests are sent to the server asynchronously via the service object. ProcessResponse() informs
* the Request object on the client side if it completed or failed.
*/
template<typename RequestType, typename MetadataType, typename AlertType>
bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo)
{
......@@ -225,7 +235,6 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi
* A Response can (optionally) contain Metadata. This can be used for simple responses (e.g. status
* code, short message) or as the header for large asynchronous data transfers or streaming data.
*/
template<typename RequestType, typename MetadataType, typename AlertType>
void Request<RequestType, MetadataType, AlertType>::ProcessResponseMetadata()
{
......@@ -273,7 +282,6 @@ void Request<RequestType, MetadataType, AlertType>::ProcessResponseMetadata()
* ProcessResponseData() is called either by GetResponseData(), or asynchronously at any time for data
* streams.
*/
template<typename RequestType, typename MetadataType, typename AlertType>
XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType>
::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last)
......@@ -282,7 +290,9 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType>
std::cerr << "[DEBUG] ProcessResponseData(): received " << response_buflen << " bytes of data" << std::endl;
#endif
// If an error occurred setting up the response, the buflen is set to 0
XrdSsiRequest::PRD_Xeq post_process = XrdSsiRequest::PRD_Normal;
// The buffer length is set to -1 if an error occurred setting up the response
if(response_buflen == -1)
{
throw XrdSsiException(eInfo);
......@@ -291,17 +301,15 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType>
// The buffer length can be 0 if the response is metadata only
if(response_buflen != 0)
{
// Handle one block of response data
response_bufptr[response_buflen-1] = 0;
std::cerr << response_bufptr << std::endl;
// TO DO: Provide an interface to the client to read a chunk of data
DataCallback(post_process, response_bufptr, response_buflen, is_last);
}
if(is_last)
{
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] ProcessResponseData: done" << std::endl;
#endif
// No more data, so clean up
std::cerr << "ProcessResponseData: done" << std::endl;
// If Request objects are uniform, we could re-use them instead of deleting them, to avoid the
// overhead of repeated object creation. This would require a more complex Request factory. For
......@@ -312,17 +320,14 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType>
}
else
{
std::cerr << "ProcessResponseData: read next chunk" << std::endl;
#ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] ProcessResponseData: read next chunk" << std::endl;
#endif
// If there is more data, get the next chunk
GetResponseData(m_response_bufptr, m_response_bufsize);
}
// Indicate what type of post-processing is required (normal in this case)
return XrdSsiRequest::PRD_Normal;
// If the client is resource-limited and can't handle the queue at this time,
// return XrdSsiRequest::PRD_Hold;
return post_process;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment