diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 8f923a8549edad0acd13ab9e2a0a7da3a7c5810f..9a7b29aa9b77d5ae91de56fba2325b280c114b85 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -91,8 +91,14 @@ class Consumer { virtual NetworkConnectionType CurrentConnectionType() const = 0; //! Get list of streams with filter, set from to "" to get all streams + /*! + \param from - name of stream to cut list of streams + \param filter - filter list of streams: all, finished or unfinished. + \param detailed - last timestamp, last message id and stream finish flag are filled only if true. + \return Error - will be nullptr on success + */ + virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0; virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0; - virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0; //! Get list of datasources for beamtime given in the consumer constructor /*! @@ -164,6 +170,16 @@ class Consumer { \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0; + + //! Receive next by ID message. + /*! + \param info - where to store message metadata. Can be set to nullptr only message data is needed. + \param group_id - group id to use + \param data - where to store message data. Can be set to nullptr only message metadata is needed. + \param stream - stream to use + \param ordered - order return messages if true, overwise return next available message. + \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. + */ virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) = 0; //! Retrieves message using message meta. diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 836243d9f10f8e16533c8977a7e80991c5af95ff..1a3fe7509da94bc38cbaa5824754c088c2fdbab9 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -159,6 +159,21 @@ cdef class PyConsumer: PyArray_ENABLEFLAGS(arr,np.NPY_ARRAY_OWNDATA) return arr,meta def get_next(self, group_id, meta_only = True, stream = "default", ordered = True): + """Return next (with respect to given group id) message for given stream. + + Function iterates across stream for given consumer group (create one, if not exists). + For each consumer group each message will be returned only once. + If message is not acknowledged, and resend functionality is activated the message will be returned again. + + :param group_id: Name of the consumers group id. Next message is returned with respect of consumer group. + :type group_id: string + :param meta_only: If true, return only message metadata + :type meta_only: bool + :param stream: Name of stream to consider + :type stream: string + :param ordered: It true return ordered messages, overwise return next available message. + :type ordered: bool + """ if ordered: return self._op("next",group_id,stream,meta_only,0) else: @@ -271,6 +286,15 @@ cdef class PyConsumer: throw_exception(err) return _str(group_id) def get_stream_list(self,from_stream = "",filter="all", detailed=True): + """Return list of streams with filter, set from to "" to get all streams for current datasource + + :param from_stream: name of stream to cut list of streams + :type from_stream: string + :param filter: filter list of streams. Possible values: `["all", "finished", "unfinished"]`. + :type filter: string + :param detailed: Flag to return detailed stream information. Last timestamp, last message id and stream finish flag are filled only if true. + :type detailed: bool + """ cdef Error err cdef vector[StreamInfo] streams cdef string b_from_stream = _bytes(from_stream) diff --git a/docs/site/docs/consumer-clients.md b/docs/site/docs/consumer-clients.md index d94119b240c25694c222e68ea54b2678d84b85b7..e1f570fbcd585a16b97329ab76f54673692a2acb 100644 --- a/docs/site/docs/consumer-clients.md +++ b/docs/site/docs/consumer-clients.md @@ -17,6 +17,7 @@ Consumer API is available for C++ and Python and has the following main function - GetNext to receive process messages one after another without need to know message indexes - Consumer API returns a message with index 1, then 2, ... as they were set by producer. - This also works in parallel so that payload is distributed within multiple consumers within same consumer group or between threads of a single consumer instance. In parallel case order of indexes of the messages is not determined. + - Optionaly it can return messages in their arrival order (return next available message). It is useful for the fast data-processing agnostic to message index. - GetLast to receive last available message - for e.g. live visualisation - GetById - get message by index - provides random access - Make queries based on metadata contained in a message - returns all messages in a stream with specific metadata. A subset of SQL language is used diff --git a/docs/site/versions.json b/docs/site/versions.json index 1f77ee6888be0d175e204c40c77b0853113ea7c1..1c8307d1b56888139a27f7239c0087aa3123da1b 100644 --- a/docs/site/versions.json +++ b/docs/site/versions.json @@ -1,4 +1,5 @@ [ + "23.11.0", "22.03.0", "21.12.0", "21.09.0",