From fe86cb691ad6395179770a5f8c6cca95f61dab65 Mon Sep 17 00:00:00 2001 From: karnem <mikhail.karnevskiy@desy.de> Date: Fri, 17 Nov 2023 14:05:27 +0100 Subject: [PATCH] Add flag ordered to get_next_dataset --- .../api/cpp/include/asapo/consumer/consumer.h | 11 ++++ consumer/api/cpp/src/consumer_impl.cpp | 18 ++++++ consumer/api/cpp/src/consumer_impl.h | 1 + consumer/api/python/asapo_consumer.pxd | 1 + consumer/api/python/asapo_consumer.pyx.in | 60 ++++++++++++++++--- 5 files changed, 82 insertions(+), 9 deletions(-) diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 9a7b29aa9..6aa9a9606 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -201,6 +201,17 @@ class Consumer { */ virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) = 0; + //! Receive next available completed dataset. + /*! + \param err - will be set to error data cannot be read, nullptr otherwise. + \param group_id - group id to use. + \param min_size - wait until dataset has min_size messages (0 for maximum size) + \param stream - stream to use + \param ordered - order return messages if true, overwise return next available dataset. + \return DataSet - information about the dataset + + */ + virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, bool ordered, Error* err) = 0; //! Receive last available dataset which has min_size messages. /*! \param err - will be set to error data cannot be read, nullptr otherwise. diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 0bc560da2..4f5da1cf1 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -773,6 +773,24 @@ DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, st err); } +DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, bool ordered, Error* err) { + if ( ordered ) { + return GetDatasetFromServer(GetMessageServerOperation::GetNextAvailable, + 0, + std::move(group_id), + std::move(stream), + min_size, + err); + } else { + return GetDatasetFromServer(GetMessageServerOperation::GetNext, + 0, + std::move(group_id), + std::move(stream), + min_size, + err); + } +} + DataSet ConsumerImpl::GetLastDataset(uint64_t min_size, std::string stream, Error* err) { return GetDatasetFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream), min_size, err); } diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 0ed900bd2..63bc6c9e0 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -101,6 +101,7 @@ class ConsumerImpl final : public asapo::Consumer { MessageMetas QueryMessages(std::string query, std::string stream, Error* err) override; DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override; + DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, bool ordered, Error* err) override; DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) override; DataSet GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override; diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 29761ad2a..de5e1c328 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -92,6 +92,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: string GetStreamMeta(string stream,Error* err) MessageMetas QueryMessages(string query, string stream, Error* err) DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, Error* err) + DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, bool ordered, Error* err) DataSet GetLastDataset(uint64_t min_size, string stream, Error* err) DataSet GetLastDataset(string group_id, uint64_t min_size, string stream, Error* err) DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 1a3fe7509..c9c330289 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -165,6 +165,8 @@ cdef class PyConsumer: For each consumer group each message will be returned only once. If message is not acknowledged, and resend functionality is activated the message will be returned again. + Flag ordered will be deprecated and in the future False value will be used. Ordered messages can be retrieved with `get_by_id` function. + :param group_id: Name of the consumers group id. Next message is returned with respect of consumer group. :type group_id: string :param meta_only: If true, return only message metadata @@ -288,12 +290,25 @@ cdef class PyConsumer: def get_stream_list(self,from_stream = "",filter="all", detailed=True): """Return list of streams with filter, set from to "" to get all streams for current datasource - :param from_stream: name of stream to cut list of streams - :type from_stream: string - :param filter: filter list of streams. Possible values: `["all", "finished", "unfinished"]`. - :type filter: string - :param detailed: Flag to return detailed stream information. Last timestamp, last message id and stream finish flag are filled only if true. - :type detailed: bool + Each entry in the list is a python dict: + + .. code-block:: python + + {'lastId': 10, # Index of the last message in the stream + 'name': 'test001', # Name of streams + 'timestampCreated': 1700046609395234833, # Timestamp of earliest message + 'timestampLast': 1700046609621885918, # Timestamp of lates message + 'finished': False, # Value of the stream-finish flag + 'nextStream': ''} # Name of the next stream + + Name of the next stream is taken from the stream-finish-flag message given by client. If this information is not given, field is empty. + + :param from_stream: name of stream to cut list of streams + :type from_stream: string + :param filter: filter list of streams. Possible values: `["all", "finished", "unfinished"]`. + :type filter: string + :param detailed: Flag to return detailed stream information. Last timestamp, last message id and stream finish flag are filled only if true. + :type detailed: bool """ cdef Error err cdef vector[StreamInfo] streams @@ -424,7 +439,9 @@ cdef class PyConsumer: cdef Error err if op == "next": with nogil: - dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream, &err) + dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream,&err) + elif op == "next_available": + dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream,True,&err) elif op == "last" and group_id == "": with nogil: dataset = self.c_consumer.get().GetLastDataset(min_size,b_stream, &err) @@ -441,8 +458,33 @@ cdef class PyConsumer: if err: throw_exception(err,res) return res - def get_next_dataset(self, group_id, min_size = 0, stream = "default"): - return self._op_dataset("next",group_id,stream,min_size,0) + def get_next_dataset(self, group_id, min_size = 0, stream = "default", ordered=True): + """ Return dataset, as a list of substream-messages metadata, that corresponds to different + substreams of a given dataset. Each metadata is a python dict, that + contains user part (if given) and service part. Corresponding data can be retrieved + via `retrieve_data` function. + + Function iterates across stream for given consumer group (create one, if not exists). + For each consumer group each message will be returned only once. + If message is not acknowledged, and resend functionality is activated the message will be returned again. + If dataset-message is not complete (don't contain required number of substrem-messages) consumer + returns AsapoPartialDataError. If resend functionality is activated the message will be returned later. + + Flag ordered will be deprecated and in the future False value will be used. Ordered messages can be retrieved with `get_dataset_by_id` function. + + :param group_id: Name of the consumers group id. Next message is returned with respect of consumer group. + :type group_id: string + :param min_size: Minimum number of substrem-messages to consider dataset complete. If 0, all substrem-messages are required. + :type min_size: int + :param stream: Name of stream to consider + :type stream: string + :param ordered: It true return ordered messages, overwise return next available message. + :type ordered: bool + """ + if ordered: + return self._op_dataset("next_available",group_id,stream,min_size,0) + else: + return self._op_dataset("next",group_id,stream,min_size,0) def get_last_dataset(self, min_size = 0, stream = "default", group_id = ""): return self._op_dataset("last",group_id,stream,min_size,0) def get_dataset_by_id(self, uint64_t id, min_size = 0, stream = "default"): -- GitLab