Skip to content
Snippets Groups Projects
Commit 02cd1e93 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add doc string to get_next and get_stream_list consumer functions.

parent b3546074
No related branches found
No related tags found
No related merge requests found
......@@ -91,8 +91,14 @@ class Consumer {
virtual NetworkConnectionType CurrentConnectionType() const = 0;
//! Get list of streams with filter, set from to "" to get all streams
/*!
\param from - name of stream to cut list of streams
\param filter - filter list of streams: all, finished or unfinished.
\param detailed - last timestamp, last message id and stream finish flag are filled only if true.
\return Error - will be nullptr on success
*/
virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0;
virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0;
virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0;
//! Get list of datasources for beamtime given in the consumer constructor
/*!
......@@ -164,6 +170,16 @@ class Consumer {
\return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0;
//! Receive next by ID message.
/*!
\param info - where to store message metadata. Can be set to nullptr only message data is needed.
\param group_id - group id to use
\param data - where to store message data. Can be set to nullptr only message metadata is needed.
\param stream - stream to use
\param ordered - order return messages if true, overwise return next available message.
\return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) = 0;
//! Retrieves message using message meta.
......
......@@ -159,6 +159,21 @@ cdef class PyConsumer:
PyArray_ENABLEFLAGS(arr,np.NPY_ARRAY_OWNDATA)
return arr,meta
def get_next(self, group_id, meta_only = True, stream = "default", ordered = True):
"""Return next (with respect to given group id) message for given stream.
Function iterates across stream for given consumer group (create one, if not exists).
For each consumer group each message will be returned only once.
If message is not acknowledged, and resend functionality is activated the message will be returned again.
:param group_id: Name of the consumers group id. Next message is returned with respect of consumer group.
:type group_id: string
:param meta_only: If true, return only message metadata
:type meta_only: bool
:param stream: Name of stream to consider
:type stream: string
:param ordered: It true return ordered messages, overwise return next available message.
:type ordered: bool
"""
if ordered:
return self._op("next",group_id,stream,meta_only,0)
else:
......@@ -271,6 +286,15 @@ cdef class PyConsumer:
throw_exception(err)
return _str(group_id)
def get_stream_list(self,from_stream = "",filter="all", detailed=True):
"""Return list of streams with filter, set from to "" to get all streams for current datasource
:param from_stream: name of stream to cut list of streams
:type from_stream: string
:param filter: filter list of streams. Possible values: `["all", "finished", "unfinished"]`.
:type filter: string
:param detailed: Flag to return detailed stream information. Last timestamp, last message id and stream finish flag are filled only if true.
:type detailed: bool
"""
cdef Error err
cdef vector[StreamInfo] streams
cdef string b_from_stream = _bytes(from_stream)
......
......@@ -17,6 +17,7 @@ Consumer API is available for C++ and Python and has the following main function
- GetNext to receive process messages one after another without need to know message indexes
- Consumer API returns a message with index 1, then 2, ... as they were set by producer.
- This also works in parallel so that payload is distributed within multiple consumers within same consumer group or between threads of a single consumer instance. In parallel case order of indexes of the messages is not determined.
- Optionaly it can return messages in their arrival order (return next available message). It is useful for the fast data-processing agnostic to message index.
- GetLast to receive last available message - for e.g. live visualisation
- GetById - get message by index - provides random access
- Make queries based on metadata contained in a message - returns all messages in a stream with specific metadata. A subset of SQL language is used
......
[
"23.11.0",
"22.03.0",
"21.12.0",
"21.09.0",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment