XrdSsiPbRequest.h 8.4 KB
Newer Older
1
2
#ifndef __XRD_SSI_PB_REQUEST_H
#define __XRD_SSI_PB_REQUEST_H
Michael Davis's avatar
Michael Davis committed
3
4
5

#include <XrdSsi/XrdSsiRequest.hh>

Michael Davis's avatar
Michael Davis committed
6
7


8
9
// XRootD SSI + Protocol Buffers callbacks: The client should specialize on this class for each XRootD reply type
// (Response, Metadata, Alert, Error)
Michael Davis's avatar
Michael Davis committed
10
11
12
13
14
15
16
17
18
19
20
21

template<typename CallbackArg>
class XrdSsiPbRequestCallback
{
public:
   void operator()(const CallbackArg &arg);
};



// XRootD SSI + Protocol Buffers Request class

22
template <typename RequestType, typename ResponseType, typename MetadataType, typename AlertType>
23
class XrdSsiPbRequest : public XrdSsiRequest
Michael Davis's avatar
Michael Davis committed
24
25
{
public:
26
27
28
29
           XrdSsiPbRequest(const std::string &buffer_str, unsigned int response_bufsize, uint16_t timeout) :
              m_request_bufptr(buffer_str.c_str()),
              m_request_len(buffer_str.size()),
              m_response_bufsize(response_bufsize)
Michael Davis's avatar
Michael Davis committed
30
           {
31
32
33
34
35
36
              std::cerr << "Creating XrdSsiPbRequest object:" << std::endl;
              std::cerr << "  Response buffer size = " << m_response_bufsize << std::endl;
              std::cerr << "  Response timeout = " << timeout << std::endl;

              // Set response timeout

37
              SetTimeOut(timeout);
Michael Davis's avatar
Michael Davis committed
38
           }
39
   virtual ~XrdSsiPbRequest() 
Michael Davis's avatar
Michael Davis committed
40
           {
41
              std::cerr << "Deleting XrdSsiPbRequest object" << std::endl;
Michael Davis's avatar
Michael Davis committed
42
43
           }

Michael Davis's avatar
Michael Davis committed
44
45
46
47
48
49
   // It is up to the implementation to create request data, save it in some manner, and provide it to
   // the framework when GetRequest() is called. Optionally define the RelRequestBuffer() method to
   // clean up when the framework no longer needs access to the data.
   //
   // The thread used to initiate a request may be the same one used in the GetRequest() call.

50
   virtual char *GetRequest(int &reqlen) override { reqlen = m_request_len; return const_cast<char*>(m_request_bufptr); }
Michael Davis's avatar
Michael Davis committed
51
52
53
54
55
56

   // Requests are sent to the server asynchronously via the service object. The ProcessResponse() callback
   // is used to inform the request object if the request completed or failed.

   virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo) override;

57
   // ProcessResponseData() is an optional callback used in conjunction with the request's GetResponseData() method,
Michael Davis's avatar
Michael Davis committed
58
59
60
61
62
63
64
65
66
67
68
   // or when the response is a data stream and you wish to asynchronously receive data via the stream. Most
   // applications will need to implement this as scalable applications generally require that any large amount of
   // data be asynchronously received.

   virtual XrdSsiRequest::PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int blen, bool last) override;

   // Alert method is optional, by default Alert messages are ignored

   virtual void Alert(XrdSsiRespInfoMsg &aMsg) override;

private:
69
70
71
72
73
   // Pointer to the Request buffer

   const char *m_request_bufptr;
   int         m_request_len;

74
   // Response buffer size
75
76

   int         m_response_bufsize;
Michael Davis's avatar
Michael Davis committed
77

78
   // Callbacks for each of the XRootD reply types
Michael Davis's avatar
Michael Davis committed
79

80
   XrdSsiPbRequestCallback<ResponseType> ResponseCallback;
Michael Davis's avatar
Michael Davis committed
81
82
83
84
85
86
   XrdSsiPbRequestCallback<MetadataType> MetadataCallback;
   XrdSsiPbRequestCallback<AlertType>    AlertCallback;

   // Additional callback for handling errors from the XRootD framework

   XrdSsiPbRequestCallback<std::string>  ErrorCallback;
Michael Davis's avatar
Michael Davis committed
87
88
};

89
90
91
92


// Process the response

93
template<typename RequestType, typename ResponseType, typename MetadataType, typename AlertType>
94
bool XrdSsiPbRequest<RequestType, ResponseType, MetadataType, AlertType>::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo)
95
{
96
   std::cerr << "ProcessResponse() callback called with response type = " << rInfo.State() << std::endl;
97

Michael Davis's avatar
Michael Davis committed
98
   switch(rInfo.rType)
99
   {
Michael Davis's avatar
Michael Davis committed
100
      // Handle errors in the XRootD framework (e.g. no response from server)
101

Michael Davis's avatar
Michael Davis committed
102
103
104
      case XrdSsiRespInfo::isError:
         ErrorCallback(eInfo.Get());
         Finished();    // Return control of the object to the calling thread and delete rInfo
105

Michael Davis's avatar
Michael Davis committed
106
107
         // Andy says it is now safe to delete the Request object, which implies that the pointer on the calling side
         // will never refer to it again and the destructor of the base class doesn't access any class members.
108

Michael Davis's avatar
Michael Davis committed
109
110
         delete this;
         break;
111

Michael Davis's avatar
Michael Davis committed
112
113
114
115
116
117
      case XrdSsiRespInfo::isHandle:
         // To implement detached requests, add another callback type which saves the handle
         ErrorCallback("Detached requests are not implemented.");
         Finished();
         delete this;
         break;
118

Michael Davis's avatar
Michael Davis committed
119
120
121
122
123
124
      case XrdSsiRespInfo::isFile:
         // To implement file requests, add another callback type
         ErrorCallback("File requests are not implemented.");
         Finished();
         delete this;
         break;
125

Michael Davis's avatar
Michael Davis committed
126
127
128
129
130
131
      case XrdSsiRespInfo::isStream:
         // To implement stream requests, add another callback type
         ErrorCallback("Stream requests are not implemented.");
         Finished();
         delete this;
         break;
132

Michael Davis's avatar
Michael Davis committed
133
134
135
136
      case XrdSsiRespInfo::isNone:
      case XrdSsiRespInfo::isData:
         // Check for metadata: Arbitrary metadata can be sent ahead of the response data, for example to
         // describe the response so that it can be handled in the most optimal way.
137

Michael Davis's avatar
Michael Davis committed
138
139
         int metadata_len;
         const char *metadata_buffer = GetMetadata(metadata_len);
140

Michael Davis's avatar
Michael Davis committed
141
142
143
         if(metadata_len > 0)
         {
            // Deserialize the metadata
144

Michael Davis's avatar
Michael Davis committed
145
            const std::string metadata_str(metadata_buffer, metadata_len);
146

Michael Davis's avatar
Michael Davis committed
147
            MetadataType metadata;
148

Michael Davis's avatar
Michael Davis committed
149
150
151
152
153
154
155
            if(!metadata.ParseFromString(metadata_str))
            {
               ErrorCallback("metadata.ParseFromString() failed");
               Finished();
               delete this;
               break;
            }
156

Michael Davis's avatar
Michael Davis committed
157
            MetadataCallback(metadata);
158

Michael Davis's avatar
Michael Davis committed
159
            // If this is a metadata-only response, there is nothing more to do
160

Michael Davis's avatar
Michael Davis committed
161
162
163
164
165
166
167
            if(rInfo.rType == XrdSsiRespInfo::isNone)
            {
               Finished();
               delete this;
               break;
            }
         }
168

169
         // Handle response messages
170

Michael Davis's avatar
Michael Davis committed
171
172
         if(rInfo.rType == XrdSsiRespInfo::isData)
         {
173
            // Allocate response buffer
174

175
            char *response_bufptr = new char[m_response_bufsize];
176

177
178
179
            // Read the first chunk of data into the buffer, and pass it to ProcessResponseData()

            GetResponseData(response_bufptr, m_response_bufsize);
Michael Davis's avatar
Michael Davis committed
180
         }
181
182
   }

Michael Davis's avatar
Michael Davis committed
183
   return true;
184
185
186
187
}



188
template<typename RequestType, typename ResponseType, typename MetadataType, typename AlertType>
189
190
XrdSsiRequest::PRD_Xeq XrdSsiPbRequest<RequestType, ResponseType, MetadataType, AlertType>
             ::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last)
191
{
192
   // The buffer length can be 0 if the response is metadata only
193

194
195
196
   if(response_buflen != 0)
   {
      // How do we handle message boundaries for multi-block responses?
197

198
      // Deserialize the response
199

200
      const std::string response_str(response_bufptr, response_buflen);
201

202
      ResponseType response;
203

204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
      if(response.ParseFromString(response_str))
      {
         ResponseCallback(response);
      }
      else
      {
         ErrorCallback("response.ParseFromString() failed");
      }
   }

   // If there is more data then get it, otherwise clean up

   if(!is_last)
   {
      // Get the next chunk of data:
      // Does this call this method recursively? Is there danger of a stack overflow?
220

221
      GetResponseData(response_bufptr, m_response_bufsize);
222
223
224
   }
   else
   {
225
      delete[] response_bufptr;
226

227
228
      // Note that if request objects are uniform, you may want to re-use them instead
      // of deleting them, to avoid the overhead of repeated object creation.
229

230
231
      Finished();
      delete this;
232
233
234
   }

   return XrdSsiRequest::PRD_Normal; // Indicate what type of post-processing is required (normal in this case)
235
                                     // If we can't handle the queue at this time, return XrdSsiRequest::PRD_Hold;
236
237
238
239
}



240
template<typename RequestType, typename ResponseType, typename MetadataType, typename AlertType>
241
void XrdSsiPbRequest<RequestType, ResponseType, MetadataType, AlertType>::Alert(XrdSsiRespInfoMsg &alert_msg)
242
{
243
244
245
246
247
248
   // Get the Alert

   int alert_len;
   char *alert_buffer = alert_msg.GetMsg(alert_len);

   // Deserialize the Alert
249

250
   const std::string alert_str(alert_buffer, alert_len);
251

252
   AlertType alert;
253

254
255
256
257
258
259
260
261
   if(alert.ParseFromString(alert_str))
   {
      AlertCallback(alert);
   }
   else
   {
      ErrorCallback("alert.ParseFromString() failed");
   }
262

263
   // Recycle the message to free memory
264

265
   alert_msg.RecycleMsg();
266
267
}

Michael Davis's avatar
Michael Davis committed
268
#endif