Skip to content
Snippets Groups Projects
Commit 11e4e126 authored by Michael Davis's avatar Michael Davis
Browse files

Adds TestSsiRequest class

parent 46aa5ca6
No related branches found
No related tags found
No related merge requests found
*.o
*.so
test.pb.*
test_client
......@@ -15,9 +15,9 @@ all: test_client
LDLIBS=-lprotobuf
test_client: test_client.o test.pb.o
test_client: test_client.o TestSsiRequest.o test.pb.o
test_client.o: test_client.cpp TestSsiService.h test.pb.h
test_client.o: test_client.cpp TestSsiService.h TestSsiRequest.h test.pb.h
test.pb.h: test.proto
#protoc3 --cpp_out=. --plugin=protoc-gen-XrdSsi=./protoc-gen-XrdSsi --XrdSsi_out=. test.proto
......
#include <iostream>
#include "TestSsiRequest.h"
// Process the response
bool TestSsiRequest::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo)
{
using namespace std;
cerr << "ProcessResponse() callback called with response type = " << rInfo.State() << endl;
// Resume handling callbacks if they were on hold
if(!queue_on_hold)
{
cerr << "resuming...";
this->RestartDataResponse(XrdSsiRequest::RDR_All);
}
if (eInfo.hasError())
{
// Handle error using the passed eInfo object
cerr << "Error = " << eInfo.Get() << endl;
Finished(); // Returns control of the object to the calling thread
// Deletes rInfo
// Andy says you can now do a "delete this"
delete this; // well, OK, so long as you are 100% sure that this object was allocated by new,
// that the pointer on the calling side will never refer to it again, that the
// destructor of the base class doesn't access any class members, ...
}
else
{
// Arbitrary metadata can be sent ahead of the response data, for example to describe the
// response so that it can be handled in the most optimum way. To access the metadata, call
// GetMetadata() before calling GetResponseData().
int myMetadataLen;
GetMetadata(myMetadataLen);
if(rInfo.rType == XrdSsiRespInfo::isData && myMetadataLen == 0)
{
cerr << "Response is metadata only." << endl;
// do something with metadata
// clean up
Finished();
delete this;
}
else if(rInfo.rType == XrdSsiRespInfo::isHandle)
{
cerr << "Response is detached, handle = " << endl;
// copy the handle somewhere
// clean up
Finished();
delete this;
}
else
{
// A proper data response type
static const int myBSize = 1024;
char *myBuff = reinterpret_cast<char*>(malloc(myBSize));
GetResponseData(myBuff, myBSize);
}
}
return true; // you should always return true. (So why not make the return type void?)
}
XrdSsiRequest::PRD_Xeq TestSsiRequest::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *myBuff, int myBLen, bool isLast)
{
using namespace std;
// Simulate the scenario where we can't handle the queue at this time
queue_on_hold = false;
if(queue_on_hold)
{
cerr << "Response queue is on hold...";
queue_on_hold = false;
return XrdSsiRequest::PRD_Hold;
}
// GetResponseData() above places the data in the allocated buffer, then calls this method with
// the buffer type and length
cerr << "Called ProcessResponseData with myBLen = " << myBLen << ", isLast = " << isLast << endl;
// myBLen can be 0 if there is no relevant response or response is metadata only
// Process the response data of length myBLen placed in myBuff
// If there is more data then get it, else free the buffer and
// indicate to the framework that we are done
if(!isLast)
{
static const int myBSize = 1024;
// Get the next chunk of data (and call this method recursively ... could this cause a stack overflow ?)
GetResponseData(myBuff, myBSize);
}
else
{
myBuff[myBLen] = 0;
cerr << "Contents of myBuff = " << myBuff << endl;
free(myBuff);
Finished(); // Andy says you can now do a "delete this"
delete this; // Note that if request objects are uniform, you may want to re-use them instead
// of deleting them, to avoid the overhead of repeated object creation.
}
return XrdSsiRequest::PRD_Normal; // Indicate what type of post-processing is required (normal in this case)
}
void TestSsiRequest::Alert(XrdSsiRespInfoMsg &aMsg)
{
using namespace std;
int aMsgLen;
char *aMsgData = aMsg.GetMsg(aMsgLen);
// Process the alert
cout << "Received Alert message: " << aMsgData << endl;
// Failure to recycle the message will cause a memory leak
aMsg.RecycleMsg();
}
#ifndef __TEST_SSI_REQUEST_H
#define __TEST_SSI_REQUEST_H
#include <XrdSsi/XrdSsiRequest.hh>
class TestSsiRequest : public XrdSsiRequest
{
public:
// It is up to the implementation to create request data, save it in some manner, and provide it to
// the framework when GetRequest() is called. Optionally define the RelRequestBuffer() method to
// clean up when the framework no longer needs access to the data.
//
// The thread used to initiate a request may be the same one used in the GetRequest() call.
// Query for Andy: shouldn't the return type for GetRequest be const?
virtual char *GetRequest(int &dlen) override {dlen = reqBLen; return const_cast<char*>(reqBuff);}
// Requests are sent to the server asynchronously via the service object. The ProcessResponse() callback
// is used to inform the request object if the request completed or failed.
virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo) override;
// ProcessReesponseData() is an optional callback used in conjunction with the request's GetResponseData() method,
// or when the response is a data stream and you wish to asynchronously receive data via the stream. Most
// applications will need to implement this as scalable applications generally require that any large amount of
// data be asynchronously received.
virtual XrdSsiRequest::PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int blen, bool last) override;
// Alert method is optional, by default Alert messages are ignored
virtual void Alert(XrdSsiRespInfoMsg &aMsg) override;
// Constructor/Destructor
TestSsiRequest(const std::string &buffer_str, uint16_t tmo=0) : reqBuff(buffer_str.c_str()), reqBLen(buffer_str.length()), queue_on_hold(true)
{
std::cerr << "Creating TestSsiRequest object, setting tmo=" << tmo << std::endl;
this->SetTimeOut(tmo);
}
virtual ~TestSsiRequest()
{
std::cerr << "Deleting TestSsiRequest object" << std::endl;
}
private:
const char *reqBuff;
int reqBLen;
bool queue_on_hold;
};
#endif
......@@ -4,8 +4,9 @@
#include <iostream>
#include <stdexcept>
#include "XrdSsi/XrdSsiProvider.hh"
#include "XrdSsi/XrdSsiService.hh"
#include <XrdSsi/XrdSsiProvider.hh>
#include <XrdSsi/XrdSsiService.hh>
#include "TestSsiRequest.h"
// Probably we want to allow multiple resources, e.g. streaming and non-streaming versions of the service
// Can this be defined in the protobuf definition?
......@@ -19,6 +20,7 @@ const std::string TestSsiResource("/test");
class XrdSsiException : public std::exception
{
public:
XrdSsiException(const std::string &err_msg) : error_msg(err_msg) {}
XrdSsiException(const XrdSsiErrInfo &eInfo) : error_msg(eInfo.Get()) {}
const char* what() const noexcept { return error_msg.c_str(); }
......@@ -35,12 +37,13 @@ extern XrdSsiProvider *XrdSsiProviderClient;
template <typename RequestType, typename ResponseType>
class TestSsiService
{
public:
TestSsiService() = delete;
TestSsiService(std::string hostname, int port) : resource(TestSsiResource)
TestSsiService(std::string hostname, int port, int to = 15) : resource(TestSsiResource), timeout(to)
{
XrdSsiErrInfo eInfo;
......@@ -69,10 +72,39 @@ public:
}
}
void send(RequestType request_msg)
{
// Requests are always executed in the context of a service. They need to correspond to what the service allows.
XrdSsiRequest *requestP;
// Serialize the request object
std::string data;
if(!request_msg.SerializeToString(&data))
{
throw XrdSsiException("SerializeToString() failed");
}
requestP = new TestSsiRequest(data, timeout);
// Transfer ownership of the request to the service object
// TestSsiRequest handles deletion of the data buffer, so we can allow the pointer to go out-of-scope
serverP->ProcessRequest(*requestP, resource);
// Note: it is safe to delete the XrdSsiResource object after ProcessRequest() returns. I don't delete it because
// I am assuming I can reuse it, but I need to check if that is a safe assumption. Perhaps I need to create a new
// resource object for each request?
}
private:
const XrdSsiResource resource; // Requests are bound to this resource
XrdSsiResource resource; // Requests are bound to this resource
XrdSsiService *serverP; // Pointer to XRootD Server object
int timeout; // Server timeout
};
#endif
......@@ -5,19 +5,19 @@
int main(int argc, char *argv[])
{
const std::string host = "localhost";
const int port = 10400;
// Verify that the version of the Google Protocol Buffer library that we linked against is
// compatible with the version of the headers we compiled against
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Obtain a Service Provider
const std::string host = "localhost";
const int port = 10400;
try
{
TestSsiService test_ssi_service(host, port);
// Obtain a Service Provider
TestSsiService<xrdssi::test::Request, xrdssi::test::Result> test_ssi_service(host, port);
// Create a Request object
......@@ -25,36 +25,14 @@ int main(int argc, char *argv[])
request.set_message_text("Archive some file");
// Output message in Json format
// Output message in Json format (for debugging only)
std::cout << "Sending message:" << std::endl;
std::cout << xrdssi::test::MessageToJsonString(request);
// Send the Request to the Service
// Initiate a Request
#if 0
// Requests are always executed in the context of a service. They need to correspond to what the service allows.
XrdSsiRequest *theRequest; // Used for demonstration purposes
// Create a request object (upcast from your implementation)
int reqDLen = 1024;
char reqData[reqDLen];
theRequest = new MyRequest(reqData, reqDLen, 5);
// Transfer ownership of the request to the service object
servP->ProcessRequest(*theRequest, theResource);
// MyRequest object handles deletion of the data buffer, we shall never mention this pointer again...
theRequest = NULL;
// Note: it is safe to delete the XrdSsiResource object after ProcessRequest() returns.
#endif
test_ssi_service.send(request);
// Wait for the response callback
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment