diff --git a/cmdline/CMakeLists.txt b/cmdline/CMakeLists.txt index 345c33a87cd8c907f33fc15ff68cbc2c09cccc21..be3dd5d4a9b197e5460d28648b5ab0b79a72a5d0 100644 --- a/cmdline/CMakeLists.txt +++ b/cmdline/CMakeLists.txt @@ -51,5 +51,7 @@ install(TARGETS eoscta_stub DESTINATION usr/bin) add_executable(cta_admin CtaAdminCmd.cpp CtaAdminCmdParse.cpp Configuration.cpp) target_link_libraries(cta_admin XrdCtaMessages XrdSsi-4 XrdSsiLib) set_property (TARGET cta_admin APPEND PROPERTY INSTALL_RPATH ${PROTOBUF3_RPATH}) +# Get some extra debug messages on stdout +target_compile_definitions(cta_admin PUBLIC XRDSSI_DEBUG) install(TARGETS cta_admin DESTINATION usr/bin) diff --git a/xroot_plugins/XrdCtaArchiveFileLs.hpp b/xroot_plugins/XrdCtaArchiveFileLs.hpp index 92c031044f4b38941b2df01b551869b649910341..a218c8f9922ddfb65ce206dae631f8885bffc0b0 100644 --- a/xroot_plugins/XrdCtaArchiveFileLs.hpp +++ b/xroot_plugins/XrdCtaArchiveFileLs.hpp @@ -20,42 +20,36 @@ #include <XrdSsi/XrdSsiStream.hh> -/*! - * The Buffer object is returned by active streams as they supply the buffer holding the requested - * data. Once the buffer is no longer needed it must be recycled by calling Recycle(). - */ -class ArchiveFileBuffer : public XrdSsiStream::Buffer +class ArchiveFileLsBuffer : public XrdSsiStream::Buffer { public: - ArchiveFileBuffer(char *dp = nullptr) : Buffer(dp) {} + ArchiveFileLsBuffer() { data = test; } - virtual void Recycle() {} -}; -#if 0 -virtual void Recycle() = 0; //!> Call to recycle the buffer when finished + int length() { return 14; } -char *data; //!> -> Buffer containing the data -Buffer *next; //!> For chaining by buffer receiver +private: + virtual void Recycle() { delete this; } - Buffer(char *dp=0) : data(dp), next(0) {} -virtual ~Buffer() {} -#endif + char *test = const_cast<char*>("HELLO, WORLD. "); +}; class ArchiveFileLsStream : public XrdSsiStream { public: - ArchiveFileLsStream() : - XrdSsiStream(XrdSsiStream::isActive), - m_archiveFileLsBuffer(test_buf) {} + ArchiveFileLsStream() : XrdSsiStream(XrdSsiStream::isActive) { + std::cerr << "[DEBUG] ArchiveFileLsStream() constructor" << std::endl; + } - virtual ~ArchiveFileLsStream() {} + virtual ~ArchiveFileLsStream() { + std::cerr << "[DEBUG] ArchiveFileLsStream() destructor" << std::endl; + } /*! * Synchronously obtain data from an active stream (server-side only). * - * @param[out] eRef The object to receive any error description. + * @param[out] eInfo The object to receive any error description. * @param[in,out] dlen input: the optimal amount of data wanted (this is a hint) * output: the actual amount of data returned in the buffer. * @param[in,out] last input: should be set to false. @@ -69,15 +63,14 @@ public: * last = false: A fatal error occurred, eRef has the reason. */ virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) { - dlen = strlen(m_archiveFileLsBuffer.data); - last = false; +std::cerr << "Called ArchiveFileLsStream::GetBuff" << std::endl; + ArchiveFileLsBuffer *buffer = new ArchiveFileLsBuffer(); +std::cerr << "Calling ArchiveFileLsBuffer::length" << std::endl; + dlen = buffer->length(); + last = true; - return &m_archiveFileLsBuffer; +std::cerr << "Returning buffer" << std::endl; + return buffer; } - -private: - char* test_buf = const_cast<char*>("Hello, world!\n"); - - ArchiveFileBuffer m_archiveFileLsBuffer; }; diff --git a/xroot_ssi_pb/XrdSsiPbRequest.hpp b/xroot_ssi_pb/XrdSsiPbRequest.hpp index a26b743ee0b88025a93c14152ca50bd4c2da4eaf..bb578473678730d3603cab312f9a2dc33f3b50c8 100644 --- a/xroot_ssi_pb/XrdSsiPbRequest.hpp +++ b/xroot_ssi_pb/XrdSsiPbRequest.hpp @@ -20,6 +20,7 @@ #include <future> #include <XrdSsi/XrdSsiRequest.hh> +#include <XrdSsi/XrdSsiStream.hh> #ifdef XRDSSI_DEBUG #include <XrdSsiPbDebug.hpp> @@ -64,7 +65,6 @@ public: * The implementation of GetRequest() must create request data, save it in some manner, and provide * it to the framework. */ - virtual char *GetRequest(int &reqlen) override { reqlen = m_request_str.size(); @@ -84,7 +84,6 @@ public: /*! * Return the future associated with this object's promise */ - auto GetFuture() { return m_promise.get_future(); } private: @@ -150,25 +149,19 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi case XrdSsiRespInfo::isData: // Process Metadata - ProcessResponseMetadata(); // Process Data - if(rInfo.blen > 0) { // Process Data Response - m_response_bufptr = new char[m_response_bufsize]; // GetResponseData() copies one chunk of data into the buffer, then calls ProcessResponseData() - GetResponseData(m_response_bufptr, m_response_bufsize); - } else { - // Response is Metadata-only + } else { // Response is Metadata-only // Return control of the object to the calling thread and delete rInfo - Finished(); // It is now safe to delete the Request object (implies that the pointer on the calling side will @@ -178,6 +171,31 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi } break; + // Stream response + + case XrdSsiRespInfo::isStream: + + // Process Metadata + ProcessResponseMetadata(); + + { + XrdSsiErrInfo errInfo; + + bool is_ok = rInfo.strmP->SetBuff(errInfo, m_response_bufptr, m_response_bufsize); + + if(!is_ok) throw XrdSsiException(errInfo); + } +#if 0 + // Process Data Response + m_response_bufptr = new char[m_response_bufsize]; + + // GetResponseData() copies one chunk of data into the buffer, then calls ProcessResponseData() +std::cerr << "isStream/GetResponseData" << std::endl; + GetResponseData(m_response_bufptr, m_response_bufsize); +std::cerr << "isStream/GetResponseData-done" << std::endl; +#endif + break; + // Handle errors in the XRootD framework (e.g. no response from server) case XrdSsiRespInfo::isError: throw XrdSsiException(eInfo); @@ -190,10 +208,6 @@ bool Request<RequestType, MetadataType, AlertType>::ProcessResponse(const XrdSsi case XrdSsiRespInfo::isFile: throw XrdSsiException("File requests are not implemented."); - // To implement stream requests, add another callback type - - case XrdSsiRespInfo::isStream: throw XrdSsiException("Stream requests are not implemented."); - // Handle invalid responses case XrdSsiRespInfo::isNone: @@ -287,20 +301,16 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType> { // Handle one block of response data + response_bufptr[response_buflen] = 0; + std::cerr << response_bufptr << std::endl; + // TO DO: Provide an interface to the client to read a chunk of data } - if(!is_last) - { - // If there is more data, get the next chunk - - GetResponseData(response_bufptr, m_response_bufsize); - } - else + if(is_last) { // No more data, so clean up - - delete[] response_bufptr; + std::cerr << "ProcessResponseData: done" << std::endl; // If Request objects are uniform, we could re-use them instead of deleting them, to avoid the // overhead of repeated object creation. This would require a more complex Request factory. For @@ -309,6 +319,13 @@ XrdSsiRequest::PRD_Xeq Request<RequestType, MetadataType, AlertType> Finished(); delete this; } + else + { + std::cerr << "ProcessResponseData: read next chunk" << std::endl; + // If there is more data, get the next chunk + + GetResponseData(response_bufptr, m_response_bufsize); + } // Indicate what type of post-processing is required (normal in this case) diff --git a/xroot_ssi_pb/XrdSsiPbRequestProc.hpp b/xroot_ssi_pb/XrdSsiPbRequestProc.hpp index 4fb221aa8b7429f1c996ee307e83564a224a0ae7..f62b85e528b84b6407629bba80fa8635ff7431aa 100644 --- a/xroot_ssi_pb/XrdSsiPbRequestProc.hpp +++ b/xroot_ssi_pb/XrdSsiPbRequestProc.hpp @@ -178,20 +178,26 @@ void RequestProc<RequestType, MetadataType, AlertType>::Execute() // Send the Response - if(m_response_str.size() == 0) + if(m_response_stream_ptr != nullptr) { - // Send a metadata-only response. If the Response is empty, we still have to call SetResponse(), - // otherwise Finished() will not be called on the Request. + // Stream Response - SetNilResponse(); + SetResponse(m_response_stream_ptr); } - else if(m_response_stream_ptr != nullptr) + else if(m_response_str.size() != 0) { - SetResponse(m_response_stream_ptr); + // Data Response + + SetResponse(m_response_str.c_str(), m_response_str.size()); } else { - SetResponse(m_response_str.c_str(), m_response_str.size()); + // Metadata-only Response + // + // It is necessary to set a Response even for empty responses, otherwise Finished() + // will not be called on the Request. + + SetNilResponse(); } // Wait for the framework to call Finished()