Commit 7de54158 authored by Michael Davis's avatar Michael Davis
Browse files

Adds TestSsiRequest class

parent 0a9e67a8
*.o
*.so
test.pb.*
test_client
......@@ -15,9 +15,9 @@ all: test_client
LDLIBS=-lprotobuf
test_client: test_client.o test.pb.o
test_client: test_client.o TestSsiRequest.o test.pb.o
test_client.o: test_client.cpp TestSsiService.h test.pb.h
test_client.o: test_client.cpp TestSsiService.h TestSsiRequest.h test.pb.h
test.pb.h: test.proto
#protoc3 --cpp_out=. --plugin=protoc-gen-XrdSsi=./protoc-gen-XrdSsi --XrdSsi_out=. test.proto
......
#include <iostream>
#include "TestSsiRequest.h"
// Process the response
bool TestSsiRequest::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo)
{
using namespace std;
cerr << "ProcessResponse() callback called with response type = " << rInfo.State() << endl;
// Resume handling callbacks if they were on hold
if(!queue_on_hold)
{
cerr << "resuming...";
this->RestartDataResponse(XrdSsiRequest::RDR_All);
}
if (eInfo.hasError())
{
// Handle error using the passed eInfo object
cerr << "Error = " << eInfo.Get() << endl;
Finished(); // Returns control of the object to the calling thread
// Deletes rInfo
// Andy says you can now do a "delete this"
delete this; // well, OK, so long as you are 100% sure that this object was allocated by new,
// that the pointer on the calling side will never refer to it again, that the
// destructor of the base class doesn't access any class members, ...
}
else
{
// Arbitrary metadata can be sent ahead of the response data, for example to describe the
// response so that it can be handled in the most optimum way. To access the metadata, call
// GetMetadata() before calling GetResponseData().
int myMetadataLen;
GetMetadata(myMetadataLen);
if(rInfo.rType == XrdSsiRespInfo::isData && myMetadataLen == 0)
{
cerr << "Response is metadata only." << endl;
// do something with metadata
// clean up
Finished();
delete this;
}
else if(rInfo.rType == XrdSsiRespInfo::isHandle)
{
cerr << "Response is detached, handle = " << endl;
// copy the handle somewhere
// clean up
Finished();
delete this;
}
else
{
// A proper data response type
static const int myBSize = 1024;
char *myBuff = reinterpret_cast<char*>(malloc(myBSize));
GetResponseData(myBuff, myBSize);
}
}
return true; // you should always return true. (So why not make the return type void?)
}
XrdSsiRequest::PRD_Xeq TestSsiRequest::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *myBuff, int myBLen, bool isLast)
{
using namespace std;
// Simulate the scenario where we can't handle the queue at this time
queue_on_hold = false;
if(queue_on_hold)
{
cerr << "Response queue is on hold...";
queue_on_hold = false;
return XrdSsiRequest::PRD_Hold;
}
// GetResponseData() above places the data in the allocated buffer, then calls this method with
// the buffer type and length
cerr << "Called ProcessResponseData with myBLen = " << myBLen << ", isLast = " << isLast << endl;
// myBLen can be 0 if there is no relevant response or response is metadata only
// Process the response data of length myBLen placed in myBuff
// If there is more data then get it, else free the buffer and
// indicate to the framework that we are done
if(!isLast)
{
static const int myBSize = 1024;
// Get the next chunk of data (and call this method recursively ... could this cause a stack overflow ?)
GetResponseData(myBuff, myBSize);
}
else
{
myBuff[myBLen] = 0;
cerr << "Contents of myBuff = " << myBuff << endl;
free(myBuff);
Finished(); // Andy says you can now do a "delete this"
delete this; // Note that if request objects are uniform, you may want to re-use them instead
// of deleting them, to avoid the overhead of repeated object creation.
}
return XrdSsiRequest::PRD_Normal; // Indicate what type of post-processing is required (normal in this case)
}
void TestSsiRequest::Alert(XrdSsiRespInfoMsg &aMsg)
{
using namespace std;
int aMsgLen;
char *aMsgData = aMsg.GetMsg(aMsgLen);
// Process the alert
cout << "Received Alert message: " << aMsgData << endl;
// Failure to recycle the message will cause a memory leak
aMsg.RecycleMsg();
}
#ifndef __TEST_SSI_REQUEST_H
#define __TEST_SSI_REQUEST_H
#include <XrdSsi/XrdSsiRequest.hh>
class TestSsiRequest : public XrdSsiRequest
{
public:
// It is up to the implementation to create request data, save it in some manner, and provide it to
// the framework when GetRequest() is called. Optionally define the RelRequestBuffer() method to
// clean up when the framework no longer needs access to the data.
//
// The thread used to initiate a request may be the same one used in the GetRequest() call.
// Query for Andy: shouldn't the return type for GetRequest be const?
virtual char *GetRequest(int &dlen) override {dlen = reqBLen; return const_cast<char*>(reqBuff);}
// Requests are sent to the server asynchronously via the service object. The ProcessResponse() callback
// is used to inform the request object if the request completed or failed.
virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo) override;
// ProcessReesponseData() is an optional callback used in conjunction with the request's GetResponseData() method,
// or when the response is a data stream and you wish to asynchronously receive data via the stream. Most
// applications will need to implement this as scalable applications generally require that any large amount of
// data be asynchronously received.
virtual XrdSsiRequest::PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int blen, bool last) override;
// Alert method is optional, by default Alert messages are ignored
virtual void Alert(XrdSsiRespInfoMsg &aMsg) override;
// Constructor/Destructor
TestSsiRequest(const std::string &buffer_str, uint16_t tmo=0) : reqBuff(buffer_str.c_str()), reqBLen(buffer_str.length()), queue_on_hold(true)
{
std::cerr << "Creating TestSsiRequest object, setting tmo=" << tmo << std::endl;
this->SetTimeOut(tmo);
}
virtual ~TestSsiRequest()
{
std::cerr << "Deleting TestSsiRequest object" << std::endl;
}
private:
const char *reqBuff;
int reqBLen;
bool queue_on_hold;
};
#endif
......@@ -4,8 +4,9 @@
#include <iostream>
#include <stdexcept>
#include "XrdSsi/XrdSsiProvider.hh"
#include "XrdSsi/XrdSsiService.hh"
#include <XrdSsi/XrdSsiProvider.hh>
#include <XrdSsi/XrdSsiService.hh>
#include "TestSsiRequest.h"
// Probably we want to allow multiple resources, e.g. streaming and non-streaming versions of the service
// Can this be defined in the protobuf definition?
......@@ -19,6 +20,7 @@ const std::string TestSsiResource("/test");
class XrdSsiException : public std::exception
{
public:
XrdSsiException(const std::string &err_msg) : error_msg(err_msg) {}
XrdSsiException(const XrdSsiErrInfo &eInfo) : error_msg(eInfo.Get()) {}
const char* what() const noexcept { return error_msg.c_str(); }
......@@ -35,12 +37,13 @@ extern XrdSsiProvider *XrdSsiProviderClient;
template <typename RequestType, typename ResponseType>
class TestSsiService
{
public:
TestSsiService() = delete;
TestSsiService(std::string hostname, int port) : resource(TestSsiResource)
TestSsiService(std::string hostname, int port, int to = 15) : resource(TestSsiResource), timeout(to)
{
XrdSsiErrInfo eInfo;
......@@ -69,10 +72,39 @@ public:
}
}
void send(RequestType request_msg)
{
// Requests are always executed in the context of a service. They need to correspond to what the service allows.
XrdSsiRequest *requestP;
// Serialize the request object
std::string data;
if(!request_msg.SerializeToString(&data))
{
throw XrdSsiException("SerializeToString() failed");
}
requestP = new TestSsiRequest(data, timeout);
// Transfer ownership of the request to the service object
// TestSsiRequest handles deletion of the data buffer, so we can allow the pointer to go out-of-scope
serverP->ProcessRequest(*requestP, resource);
// Note: it is safe to delete the XrdSsiResource object after ProcessRequest() returns. I don't delete it because
// I am assuming I can reuse it, but I need to check if that is a safe assumption. Perhaps I need to create a new
// resource object for each request?
}
private:
const XrdSsiResource resource; // Requests are bound to this resource
XrdSsiResource resource; // Requests are bound to this resource
XrdSsiService *serverP; // Pointer to XRootD Server object
int timeout; // Server timeout
};
#endif
......@@ -5,19 +5,19 @@
int main(int argc, char *argv[])
{
const std::string host = "localhost";
const int port = 10400;
// Verify that the version of the Google Protocol Buffer library that we linked against is
// compatible with the version of the headers we compiled against
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Obtain a Service Provider
const std::string host = "localhost";
const int port = 10400;
try
{
TestSsiService test_ssi_service(host, port);
// Obtain a Service Provider
TestSsiService<xrdssi::test::Request, xrdssi::test::Result> test_ssi_service(host, port);
// Create a Request object
......@@ -25,36 +25,14 @@ int main(int argc, char *argv[])
request.set_message_text("Archive some file");
// Output message in Json format
// Output message in Json format (for debugging only)
std::cout << "Sending message:" << std::endl;
std::cout << xrdssi::test::MessageToJsonString(request);
// Send the Request to the Service
// Initiate a Request
#if 0
// Requests are always executed in the context of a service. They need to correspond to what the service allows.
XrdSsiRequest *theRequest; // Used for demonstration purposes
// Create a request object (upcast from your implementation)
int reqDLen = 1024;
char reqData[reqDLen];
theRequest = new MyRequest(reqData, reqDLen, 5);
// Transfer ownership of the request to the service object
servP->ProcessRequest(*theRequest, theResource);
// MyRequest object handles deletion of the data buffer, we shall never mention this pointer again...
theRequest = NULL;
// Note: it is safe to delete the XrdSsiResource object after ProcessRequest() returns.
#endif
test_ssi_service.send(request);
// Wait for the response callback
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment