diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index 87747f45584508a46ab0051a10306848a370bf59..400ddc05a750bc32ebb76b492f482fea862c2f6a 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -49,7 +49,7 @@ void RequestCallback<cta::xrd::Alert>::operator()(const cta::xrd::Alert &alert) * Defines how Data/Stream messages should be handled */ template<> -void XrdSsiPbRequestType::DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen, bool is_last) +void XrdSsiPbRequestType::DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen) { std::cout.write(response_bufptr, response_buflen); } diff --git a/xroot_ssi_pb/XrdSsiPbRequest.hpp b/xroot_ssi_pb/XrdSsiPbRequest.hpp index f44f539b939baa043d1919df364f8cd7431d41e1..5a52c2e25cbf71bfcc9c376070a006dcfa8ee662 100644 --- a/xroot_ssi_pb/XrdSsiPbRequest.hpp +++ b/xroot_ssi_pb/XrdSsiPbRequest.hpp @@ -80,17 +80,27 @@ public: virtual void Alert(XrdSsiRespInfoMsg &alert_msg) override; /*! - * Return the future associated with this object's promise + * Return the future associated with this object's Metadata promise */ - auto GetFuture() { return m_promise.get_future(); } + auto GetMetadataFuture() { return m_metadata_promise.get_future(); } + + /*! + * Return the future associated with this object's Metadata promise + */ +#if 0 + auto GetDataFuture() { return m_data_promise.get_future(); } +#endif private: void ProcessResponseMetadata(); std::string m_request_str; //!< Request buffer + std::unique_ptr<char[]> m_response_buffer; //!< Pointer to storage for Data responses char *m_response_bufptr; //!< Pointer to the Response buffer int m_response_bufsize; //!< Size of the Response buffer - std::promise<MetadataType> m_promise; //!< Promise a reply of Metadata type + + std::promise<MetadataType> m_metadata_promise; //!< Promise a reply of Metadata type + std::promise<void> m_data_promise; //!< Promise a data or stream response RequestCallback<AlertType> AlertCallback; //!< Callback for Alert messages @@ -104,9 +114,8 @@ private: * XrdSsiRequest::PRD_Hold: client is resource-limited and can't handle the queue at this time * @param[in] response_bufptr Pointer to newly-arrived data buffer * @param[in] response_buflen Length of response_bufptr - * @param[in] is_last Flag indicating whether this is the last buffer in the Stream/Data Response */ - void DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen, bool is_last) { + void DataCallback(XrdSsiRequest::PRD_Xeq &post_process, char *response_bufptr, int response_buflen) { throw XrdSsiException("Data/Stream Responses are not implemented."); } }; @@ -157,17 +166,18 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi // Data and Metadata responses case XrdSsiRespInfo::isData: - // Process Metadata ProcessResponseMetadata(); // Process Data if(rInfo.blen > 0) { - // Process Data Response - m_response_bufptr = new char[m_response_bufsize]; + // For Data responses, we need to allocate the buffer to receive the data + m_response_buffer = std::unique_ptr<char[]>(new char[m_response_bufsize]); + + m_response_bufptr = m_response_buffer.get(); - // GetResponseData() copies one chunk of data into the buffer, then calls ProcessResponseData() + // Process Data Response: copy one chunk of data into the buffer, then call ProcessResponseData() GetResponseData(m_response_bufptr, m_response_bufsize); } else { // Response is Metadata-only @@ -184,11 +194,12 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi // Stream response case XrdSsiRespInfo::isStream: - // Process Metadata ProcessResponseMetadata(); - // Process Data Response + // For Stream responses, the framework allocates the buffer to receive the data + // Process Stream Response: set m_response_bufptr to point to the next chunk of stream data, + // then call ProcessResponseData() GetResponseData(m_response_bufptr, m_response_bufsize); break; @@ -213,7 +224,7 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi } catch(std::exception &ex) { // Use the exception to fulfil the promise - m_promise.set_exception(std::current_exception()); + m_metadata_promise.set_exception(std::current_exception()); Finished(); delete this; @@ -256,7 +267,7 @@ void Request<RequestType, MetadataType, AlertType>::ProcessResponseMetadata() if(metadata.ParseFromArray(metadata_buffer, metadata_len)) { - m_promise.set_value(metadata); + m_metadata_promise.set_value(metadata); } else { @@ -301,15 +312,18 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType> // The buffer length can be 0 if the response is metadata only if(response_buflen != 0) { - DataCallback(post_process, response_bufptr, response_buflen, is_last); + DataCallback(post_process, response_bufptr, response_buflen); } - if(is_last) + if(is_last) // No more data to come { #ifdef XRDSSI_DEBUG std::cerr << "[DEBUG] ProcessResponseData: done" << std::endl; #endif - // No more data, so clean up + // Clean up + + // Set the data promise + m_data_promise.set_value(); // If Request objects are uniform, we could re-use them instead of deleting them, to avoid the // overhead of repeated object creation. This would require a more complex Request factory. For @@ -361,7 +375,7 @@ void Request<RequestType, MetadataType, AlertType>::Alert(XrdSsiRespInfoMsg &ale } catch(std::exception &ex) { - m_promise.set_exception(std::current_exception()); + m_metadata_promise.set_exception(std::current_exception()); } // catch(...) {} ? // set_exception() can also throw()... diff --git a/xroot_ssi_pb/XrdSsiPbServiceClientSide.hpp b/xroot_ssi_pb/XrdSsiPbServiceClientSide.hpp index 90b39bb4da5fb7150985146d196cb6c630e1aefb..82d23cbcb07d0982754c33f162e136fde2f67454 100644 --- a/xroot_ssi_pb/XrdSsiPbServiceClientSide.hpp +++ b/xroot_ssi_pb/XrdSsiPbServiceClientSide.hpp @@ -150,13 +150,10 @@ template <typename RequestType, typename MetadataType, typename AlertType> MetadataType ServiceClientSide<RequestType, MetadataType, AlertType>::Send(const RequestType &request) { // Instantiate the Request object - auto request_ptr = new Request<RequestType, MetadataType, AlertType>(request, m_response_bufsize, m_request_tmo); - - auto future_response = request_ptr->GetFuture(); + auto future_response = request_ptr->GetMetadataFuture(); // Transfer ownership of the Request to the Service object. - m_server_ptr->ProcessRequest(*request_ptr, m_resource); // After ProcessRequest() returns, it is safe for request_ptr to go out-of-scope, as the framework @@ -164,7 +161,6 @@ MetadataType ServiceClientSide<RequestType, MetadataType, AlertType>::Send(const // we do not need to as we created a reusable Resource. // Wait synchronously for the framework to return its Response (or an exception) - auto response = future_response.get(); return response;