Skip to content
Snippets Groups Projects
Commit fe86cb69 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add flag ordered to get_next_dataset

parent 17546d81
No related branches found
No related tags found
No related merge requests found
......@@ -201,6 +201,17 @@ class Consumer {
*/
virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) = 0;
//! Receive next available completed dataset.
/*!
\param err - will be set to error data cannot be read, nullptr otherwise.
\param group_id - group id to use.
\param min_size - wait until dataset has min_size messages (0 for maximum size)
\param stream - stream to use
\param ordered - order return messages if true, overwise return next available dataset.
\return DataSet - information about the dataset
*/
virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, bool ordered, Error* err) = 0;
//! Receive last available dataset which has min_size messages.
/*!
\param err - will be set to error data cannot be read, nullptr otherwise.
......
......@@ -773,6 +773,24 @@ DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, st
err);
}
DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, bool ordered, Error* err) {
if ( ordered ) {
return GetDatasetFromServer(GetMessageServerOperation::GetNextAvailable,
0,
std::move(group_id),
std::move(stream),
min_size,
err);
} else {
return GetDatasetFromServer(GetMessageServerOperation::GetNext,
0,
std::move(group_id),
std::move(stream),
min_size,
err);
}
}
DataSet ConsumerImpl::GetLastDataset(uint64_t min_size, std::string stream, Error* err) {
return GetDatasetFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream), min_size, err);
}
......
......@@ -101,6 +101,7 @@ class ConsumerImpl final : public asapo::Consumer {
MessageMetas QueryMessages(std::string query, std::string stream, Error* err) override;
DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override;
DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, bool ordered, Error* err) override;
DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) override;
DataSet GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override;
......
......@@ -92,6 +92,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
string GetStreamMeta(string stream,Error* err)
MessageMetas QueryMessages(string query, string stream, Error* err)
DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, Error* err)
DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, bool ordered, Error* err)
DataSet GetLastDataset(uint64_t min_size, string stream, Error* err)
DataSet GetLastDataset(string group_id, uint64_t min_size, string stream, Error* err)
DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err)
......
......@@ -165,6 +165,8 @@ cdef class PyConsumer:
For each consumer group each message will be returned only once.
If message is not acknowledged, and resend functionality is activated the message will be returned again.
Flag ordered will be deprecated and in the future False value will be used. Ordered messages can be retrieved with `get_by_id` function.
:param group_id: Name of the consumers group id. Next message is returned with respect of consumer group.
:type group_id: string
:param meta_only: If true, return only message metadata
......@@ -288,12 +290,25 @@ cdef class PyConsumer:
def get_stream_list(self,from_stream = "",filter="all", detailed=True):
"""Return list of streams with filter, set from to "" to get all streams for current datasource
:param from_stream: name of stream to cut list of streams
:type from_stream: string
:param filter: filter list of streams. Possible values: `["all", "finished", "unfinished"]`.
:type filter: string
:param detailed: Flag to return detailed stream information. Last timestamp, last message id and stream finish flag are filled only if true.
:type detailed: bool
Each entry in the list is a python dict:
.. code-block:: python
{'lastId': 10, # Index of the last message in the stream
'name': 'test001', # Name of streams
'timestampCreated': 1700046609395234833, # Timestamp of earliest message
'timestampLast': 1700046609621885918, # Timestamp of lates message
'finished': False, # Value of the stream-finish flag
'nextStream': ''} # Name of the next stream
Name of the next stream is taken from the stream-finish-flag message given by client. If this information is not given, field is empty.
:param from_stream: name of stream to cut list of streams
:type from_stream: string
:param filter: filter list of streams. Possible values: `["all", "finished", "unfinished"]`.
:type filter: string
:param detailed: Flag to return detailed stream information. Last timestamp, last message id and stream finish flag are filled only if true.
:type detailed: bool
"""
cdef Error err
cdef vector[StreamInfo] streams
......@@ -424,7 +439,9 @@ cdef class PyConsumer:
cdef Error err
if op == "next":
with nogil:
dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream, &err)
dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream,&err)
elif op == "next_available":
dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream,True,&err)
elif op == "last" and group_id == "":
with nogil:
dataset = self.c_consumer.get().GetLastDataset(min_size,b_stream, &err)
......@@ -441,8 +458,33 @@ cdef class PyConsumer:
if err:
throw_exception(err,res)
return res
def get_next_dataset(self, group_id, min_size = 0, stream = "default"):
return self._op_dataset("next",group_id,stream,min_size,0)
def get_next_dataset(self, group_id, min_size = 0, stream = "default", ordered=True):
""" Return dataset, as a list of substream-messages metadata, that corresponds to different
substreams of a given dataset. Each metadata is a python dict, that
contains user part (if given) and service part. Corresponding data can be retrieved
via `retrieve_data` function.
Function iterates across stream for given consumer group (create one, if not exists).
For each consumer group each message will be returned only once.
If message is not acknowledged, and resend functionality is activated the message will be returned again.
If dataset-message is not complete (don't contain required number of substrem-messages) consumer
returns AsapoPartialDataError. If resend functionality is activated the message will be returned later.
Flag ordered will be deprecated and in the future False value will be used. Ordered messages can be retrieved with `get_dataset_by_id` function.
:param group_id: Name of the consumers group id. Next message is returned with respect of consumer group.
:type group_id: string
:param min_size: Minimum number of substrem-messages to consider dataset complete. If 0, all substrem-messages are required.
:type min_size: int
:param stream: Name of stream to consider
:type stream: string
:param ordered: It true return ordered messages, overwise return next available message.
:type ordered: bool
"""
if ordered:
return self._op_dataset("next_available",group_id,stream,min_size,0)
else:
return self._op_dataset("next",group_id,stream,min_size,0)
def get_last_dataset(self, min_size = 0, stream = "default", group_id = ""):
return self._op_dataset("last",group_id,stream,min_size,0)
def get_dataset_by_id(self, uint64_t id, min_size = 0, stream = "default"):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment