Skip to content
Snippets Groups Projects
Commit f3c38aba authored by Michael Davis's avatar Michael Davis
Browse files

[ssi_af_ls] Corrects memory allocation for Data/Stream responses

In the case of a Stream response, XRootD allocates the memory and
manipulates the buffer pointer. In the case of a Data response, we need
to allocate the buffer ourselves. In both cases, the buffer is treated
in ProcessResponseData(), which does not care how the buffer was filled.

This requires a bit of extra logic to make sure the buffer is
deallocated for Data responses but not Stream responses. I have done
this using a unique pointer to allocate a char[] for Data responses.
and char *m_buffer_ptr points to the memory allocated.

We never delete m_buffer_ptr as memory allocation is handled either
by the unique_ptr or by XRoot and we don't care which.
parent b82e9f3c
Branches
Tags
No related merge requests found
...@@ -49,7 +49,7 @@ void RequestCallback<cta::xrd::Alert>::operator()(const cta::xrd::Alert &alert) ...@@ -49,7 +49,7 @@ void RequestCallback<cta::xrd::Alert>::operator()(const cta::xrd::Alert &alert)
* Defines how Data/Stream messages should be handled * Defines how Data/Stream messages should be handled
*/ */
template<> template<>
void XrdSsiPbRequestType::DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen, bool is_last) void XrdSsiPbRequestType::DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen)
{ {
std::cout.write(response_bufptr, response_buflen); std::cout.write(response_bufptr, response_buflen);
} }
......
...@@ -80,17 +80,27 @@ public: ...@@ -80,17 +80,27 @@ public:
virtual void Alert(XrdSsiRespInfoMsg &alert_msg) override; virtual void Alert(XrdSsiRespInfoMsg &alert_msg) override;
/*! /*!
* Return the future associated with this object's promise * Return the future associated with this object's Metadata promise
*/ */
auto GetFuture() { return m_promise.get_future(); } auto GetMetadataFuture() { return m_metadata_promise.get_future(); }
/*!
* Return the future associated with this object's Metadata promise
*/
#if 0
auto GetDataFuture() { return m_data_promise.get_future(); }
#endif
private: private:
void ProcessResponseMetadata(); void ProcessResponseMetadata();
std::string m_request_str; //!< Request buffer std::string m_request_str; //!< Request buffer
std::unique_ptr<char[]> m_response_buffer; //!< Pointer to storage for Data responses
char *m_response_bufptr; //!< Pointer to the Response buffer char *m_response_bufptr; //!< Pointer to the Response buffer
int m_response_bufsize; //!< Size of the Response buffer int m_response_bufsize; //!< Size of the Response buffer
std::promise<MetadataType> m_promise; //!< Promise a reply of Metadata type
std::promise<MetadataType> m_metadata_promise; //!< Promise a reply of Metadata type
std::promise<void> m_data_promise; //!< Promise a data or stream response
RequestCallback<AlertType> AlertCallback; //!< Callback for Alert messages RequestCallback<AlertType> AlertCallback; //!< Callback for Alert messages
...@@ -104,9 +114,8 @@ private: ...@@ -104,9 +114,8 @@ private:
* XrdSsiRequest::PRD_Hold: client is resource-limited and can't handle the queue at this time * XrdSsiRequest::PRD_Hold: client is resource-limited and can't handle the queue at this time
* @param[in] response_bufptr Pointer to newly-arrived data buffer * @param[in] response_bufptr Pointer to newly-arrived data buffer
* @param[in] response_buflen Length of response_bufptr * @param[in] response_buflen Length of response_bufptr
* @param[in] is_last Flag indicating whether this is the last buffer in the Stream/Data Response
*/ */
void DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen, bool is_last) { void DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen) {
throw XrdSsiException("Data/Stream Responses are not implemented."); throw XrdSsiException("Data/Stream Responses are not implemented.");
} }
}; };
...@@ -157,17 +166,18 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi ...@@ -157,17 +166,18 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi
// Data and Metadata responses // Data and Metadata responses
case XrdSsiRespInfo::isData: case XrdSsiRespInfo::isData:
// Process Metadata // Process Metadata
ProcessResponseMetadata(); ProcessResponseMetadata();
// Process Data // Process Data
if(rInfo.blen > 0) if(rInfo.blen > 0)
{ {
// Process Data Response // For Data responses, we need to allocate the buffer to receive the data
m_response_bufptr = new char[m_response_bufsize]; m_response_buffer = std::unique_ptr<char[]>(new char[m_response_bufsize]);
m_response_bufptr = m_response_buffer.get();
// GetResponseData() copies one chunk of data into the buffer, then calls ProcessResponseData() // Process Data Response: copy one chunk of data into the buffer, then call ProcessResponseData()
GetResponseData(m_response_bufptr, m_response_bufsize); GetResponseData(m_response_bufptr, m_response_bufsize);
} else { // Response is Metadata-only } else { // Response is Metadata-only
...@@ -184,11 +194,12 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi ...@@ -184,11 +194,12 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi
// Stream response // Stream response
case XrdSsiRespInfo::isStream: case XrdSsiRespInfo::isStream:
// Process Metadata // Process Metadata
ProcessResponseMetadata(); ProcessResponseMetadata();
// Process Data Response // For Stream responses, the framework allocates the buffer to receive the data
// Process Stream Response: set m_response_bufptr to point to the next chunk of stream data,
// then call ProcessResponseData()
GetResponseData(m_response_bufptr, m_response_bufsize); GetResponseData(m_response_bufptr, m_response_bufsize);
break; break;
...@@ -213,7 +224,7 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi ...@@ -213,7 +224,7 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi
} catch(std::exception &ex) { } catch(std::exception &ex) {
// Use the exception to fulfil the promise // Use the exception to fulfil the promise
m_promise.set_exception(std::current_exception()); m_metadata_promise.set_exception(std::current_exception());
Finished(); Finished();
delete this; delete this;
...@@ -256,7 +267,7 @@ void Request<RequestType, MetadataType, AlertType>::ProcessResponseMetadata() ...@@ -256,7 +267,7 @@ void Request<RequestType, MetadataType, AlertType>::ProcessResponseMetadata()
if(metadata.ParseFromArray(metadata_buffer, metadata_len)) if(metadata.ParseFromArray(metadata_buffer, metadata_len))
{ {
m_promise.set_value(metadata); m_metadata_promise.set_value(metadata);
} }
else else
{ {
...@@ -301,15 +312,18 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType> ...@@ -301,15 +312,18 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType>
// The buffer length can be 0 if the response is metadata only // The buffer length can be 0 if the response is metadata only
if(response_buflen != 0) if(response_buflen != 0)
{ {
DataCallback(post_process, response_bufptr, response_buflen, is_last); DataCallback(post_process, response_bufptr, response_buflen);
} }
if(is_last) if(is_last) // No more data to come
{ {
#ifdef XRDSSI_DEBUG #ifdef XRDSSI_DEBUG
std::cerr << "[DEBUG] ProcessResponseData: done" << std::endl; std::cerr << "[DEBUG] ProcessResponseData: done" << std::endl;
#endif #endif
// No more data, so clean up // Clean up
// Set the data promise
m_data_promise.set_value();
// If Request objects are uniform, we could re-use them instead of deleting them, to avoid the // If Request objects are uniform, we could re-use them instead of deleting them, to avoid the
// overhead of repeated object creation. This would require a more complex Request factory. For // overhead of repeated object creation. This would require a more complex Request factory. For
...@@ -361,7 +375,7 @@ void Request<RequestType, MetadataType, AlertType>::Alert(XrdSsiRespInfoMsg &ale ...@@ -361,7 +375,7 @@ void Request<RequestType, MetadataType, AlertType>::Alert(XrdSsiRespInfoMsg &ale
} }
catch(std::exception &ex) catch(std::exception &ex)
{ {
m_promise.set_exception(std::current_exception()); m_metadata_promise.set_exception(std::current_exception());
} }
// catch(...) {} ? // catch(...) {} ?
// set_exception() can also throw()... // set_exception() can also throw()...
......
...@@ -150,13 +150,10 @@ template <typename RequestType, typename MetadataType, typename AlertType> ...@@ -150,13 +150,10 @@ template <typename RequestType, typename MetadataType, typename AlertType>
MetadataType ServiceClientSide<RequestType, MetadataType, AlertType>::Send(const RequestType &request) MetadataType ServiceClientSide<RequestType, MetadataType, AlertType>::Send(const RequestType &request)
{ {
// Instantiate the Request object // Instantiate the Request object
auto request_ptr = new Request<RequestType, MetadataType, AlertType>(request, m_response_bufsize, m_request_tmo); auto request_ptr = new Request<RequestType, MetadataType, AlertType>(request, m_response_bufsize, m_request_tmo);
auto future_response = request_ptr->GetMetadataFuture();
auto future_response = request_ptr->GetFuture();
// Transfer ownership of the Request to the Service object. // Transfer ownership of the Request to the Service object.
m_server_ptr->ProcessRequest(*request_ptr, m_resource); m_server_ptr->ProcessRequest(*request_ptr, m_resource);
// After ProcessRequest() returns, it is safe for request_ptr to go out-of-scope, as the framework // After ProcessRequest() returns, it is safe for request_ptr to go out-of-scope, as the framework
...@@ -164,7 +161,6 @@ MetadataType ServiceClientSide<RequestType, MetadataType, AlertType>::Send(const ...@@ -164,7 +161,6 @@ MetadataType ServiceClientSide<RequestType, MetadataType, AlertType>::Send(const
// we do not need to as we created a reusable Resource. // we do not need to as we created a reusable Resource.
// Wait synchronously for the framework to return its Response (or an exception) // Wait synchronously for the framework to return its Response (or an exception)
auto response = future_response.get(); auto response = future_response.get();
return response; return response;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment