Skip to content
Snippets Groups Projects
Commit d41fa27a authored by Michael Davis's avatar Michael Davis
Browse files

[ssi_af_ls] Sends single object across stream from server to client

parent 64f7c304
No related branches found
No related tags found
No related merge requests found
......@@ -51,5 +51,7 @@ install(TARGETS eoscta_stub DESTINATION usr/bin)
add_executable(cta_admin CtaAdminCmd.cpp CtaAdminCmdParse.cpp Configuration.cpp)
target_link_libraries(cta_admin XrdCtaMessages XrdSsi-4 XrdSsiLib)
set_property (TARGET cta_admin APPEND PROPERTY INSTALL_RPATH ${PROTOBUF3_RPATH})
# Get some extra debug messages on stdout
target_compile_definitions(cta_admin PUBLIC XRDSSI_DEBUG)
install(TARGETS cta_admin DESTINATION usr/bin)
......@@ -20,42 +20,36 @@
#include <XrdSsi/XrdSsiStream.hh>
/*!
* The Buffer object is returned by active streams as they supply the buffer holding the requested
* data. Once the buffer is no longer needed it must be recycled by calling Recycle().
*/
class ArchiveFileBuffer : public XrdSsiStream::Buffer
class ArchiveFileLsBuffer : public XrdSsiStream::Buffer
{
public:
ArchiveFileBuffer(char *dp = nullptr) : Buffer(dp) {}
ArchiveFileLsBuffer() { data = test; }
virtual void Recycle() {}
};
#if 0
virtual void Recycle() = 0; //!> Call to recycle the buffer when finished
int length() { return 14; }
char *data; //!> -> Buffer containing the data
Buffer *next; //!> For chaining by buffer receiver
private:
virtual void Recycle() { delete this; }
Buffer(char *dp=0) : data(dp), next(0) {}
virtual ~Buffer() {}
#endif
char *test = const_cast<char*>("HELLO, WORLD. ");
};
class ArchiveFileLsStream : public XrdSsiStream
{
public:
ArchiveFileLsStream() :
XrdSsiStream(XrdSsiStream::isActive),
m_archiveFileLsBuffer(test_buf) {}
ArchiveFileLsStream() : XrdSsiStream(XrdSsiStream::isActive) {
std::cerr << "[DEBUG] ArchiveFileLsStream() constructor" << std::endl;
}
virtual ~ArchiveFileLsStream() {}
virtual ~ArchiveFileLsStream() {
std::cerr << "[DEBUG] ArchiveFileLsStream() destructor" << std::endl;
}
/*!
* Synchronously obtain data from an active stream (server-side only).
*
* @param[out] eRef The object to receive any error description.
* @param[out] eInfo The object to receive any error description.
* @param[in,out] dlen input: the optimal amount of data wanted (this is a hint)
* output: the actual amount of data returned in the buffer.
* @param[in,out] last input: should be set to false.
......@@ -69,15 +63,14 @@ public:
* last = false: A fatal error occurred, eRef has the reason.
*/
virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) {
dlen = strlen(m_archiveFileLsBuffer.data);
last = false;
std::cerr << "Called ArchiveFileLsStream::GetBuff" << std::endl;
ArchiveFileLsBuffer *buffer = new ArchiveFileLsBuffer();
std::cerr << "Calling ArchiveFileLsBuffer::length" << std::endl;
dlen = buffer->length();
last = true;
return &m_archiveFileLsBuffer;
std::cerr << "Returning buffer" << std::endl;
return buffer;
}
private:
char* test_buf = const_cast<char*>("Hello, world!\n");
ArchiveFileBuffer m_archiveFileLsBuffer;
};
......@@ -20,6 +20,7 @@
#include <future>
#include <XrdSsi/XrdSsiRequest.hh>
#include <XrdSsi/XrdSsiStream.hh>
#ifdef XRDSSI_DEBUG
#include <XrdSsiPbDebug.hpp>
......@@ -64,7 +65,6 @@ public:
* The implementation of GetRequest() must create request data, save it in some manner, and provide
* it to the framework.
*/
virtual char *GetRequest(int &reqlen) override
{
reqlen = m_request_str.size();
......@@ -84,7 +84,6 @@ public:
/*!
* Return the future associated with this object's promise
*/
auto GetFuture() { return m_promise.get_future(); }
private:
......@@ -150,25 +149,19 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi
case XrdSsiRespInfo::isData:
// Process Metadata
ProcessResponseMetadata();
// Process Data
if(rInfo.blen > 0)
{
// Process Data Response
m_response_bufptr = new char[m_response_bufsize];
// GetResponseData() copies one chunk of data into the buffer, then calls ProcessResponseData()
GetResponseData(m_response_bufptr, m_response_bufsize);
} else {
// Response is Metadata-only
} else { // Response is Metadata-only
// Return control of the object to the calling thread and delete rInfo
Finished();
// It is now safe to delete the Request object (implies that the pointer on the calling side will
......@@ -178,6 +171,31 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi
}
break;
// Stream response
case XrdSsiRespInfo::isStream:
// Process Metadata
ProcessResponseMetadata();
{
XrdSsiErrInfo errInfo;
bool is_ok = rInfo.strmP->SetBuff(errInfo, m_response_bufptr, m_response_bufsize);
if(!is_ok) throw XrdSsiException(errInfo);
}
#if 0
// Process Data Response
m_response_bufptr = new char[m_response_bufsize];
// GetResponseData() copies one chunk of data into the buffer, then calls ProcessResponseData()
std::cerr << "isStream/GetResponseData" << std::endl;
GetResponseData(m_response_bufptr, m_response_bufsize);
std::cerr << "isStream/GetResponseData-done" << std::endl;
#endif
break;
// Handle errors in the XRootD framework (e.g. no response from server)
case XrdSsiRespInfo::isError: throw XrdSsiException(eInfo);
......@@ -190,10 +208,6 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi
case XrdSsiRespInfo::isFile: throw XrdSsiException("File requests are not implemented.");
// To implement stream requests, add another callback type
case XrdSsiRespInfo::isStream: throw XrdSsiException("Stream requests are not implemented.");
// Handle invalid responses
case XrdSsiRespInfo::isNone:
......@@ -287,20 +301,16 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType>
{
// Handle one block of response data
response_bufptr[response_buflen] = 0;
std::cerr << response_bufptr << std::endl;
// TO DO: Provide an interface to the client to read a chunk of data
}
if(!is_last)
{
// If there is more data, get the next chunk
GetResponseData(response_bufptr, m_response_bufsize);
}
else
if(is_last)
{
// No more data, so clean up
delete[] response_bufptr;
std::cerr << "ProcessResponseData: done" << std::endl;
// If Request objects are uniform, we could re-use them instead of deleting them, to avoid the
// overhead of repeated object creation. This would require a more complex Request factory. For
......@@ -309,6 +319,13 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType>
Finished();
delete this;
}
else
{
std::cerr << "ProcessResponseData: read next chunk" << std::endl;
// If there is more data, get the next chunk
GetResponseData(response_bufptr, m_response_bufsize);
}
// Indicate what type of post-processing is required (normal in this case)
......
......@@ -178,20 +178,26 @@ void RequestProc<RequestType, MetadataType, AlertType>::Execute()
// Send the Response
if(m_response_str.size() == 0)
if(m_response_stream_ptr != nullptr)
{
// Send a metadata-only response. If the Response is empty, we still have to call SetResponse(),
// otherwise Finished() will not be called on the Request.
// Stream Response
SetNilResponse();
SetResponse(m_response_stream_ptr);
}
else if(m_response_stream_ptr != nullptr)
else if(m_response_str.size() != 0)
{
SetResponse(m_response_stream_ptr);
// Data Response
SetResponse(m_response_str.c_str(), m_response_str.size());
}
else
{
SetResponse(m_response_str.c_str(), m_response_str.size());
// Metadata-only Response
//
// It is necessary to set a Response even for empty responses, otherwise Finished()
// will not be called on the Request.
SetNilResponse();
}
// Wait for the framework to call Finished()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment