From e4deee51c2a3f2c70571cf0bb45ad096d52cb29a Mon Sep 17 00:00:00 2001 From: karnem <mikhail.karnevskiy@desy.de> Date: Mon, 28 Aug 2023 11:08:06 +0200 Subject: [PATCH] Add optional argument `ordered` to consumer function get_next. If true messages will be ordered by message_id otherwise by _id. --- .../api/cpp/include/asapo/consumer/consumer.h | 3 ++- consumer/api/cpp/src/consumer_impl.cpp | 25 +++++++++++++++++++ consumer/api/cpp/src/consumer_impl.h | 2 ++ consumer/api/python/asapo_consumer.pxd | 2 +- consumer/api/python/asapo_consumer.pyx.in | 12 ++++++--- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 03b48601e..fbc6bebec 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -149,7 +149,7 @@ class Consumer { */ virtual std::string GetStreamMeta(const std::string& stream, Error* err) = 0; - //! Receive next available message. + //! Receive next by ID message. /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. \param group_id - group id to use @@ -158,6 +158,7 @@ class Consumer { \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0; + virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) = 0; //! Retrieves message using message meta. /*! diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 8710b5417..1f04b8036 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -346,6 +346,11 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_ms=" + std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_); } + if (op == GetMessageServerOperation::GetNextAvailable) { + ri.extra_params += "&id_key=_id"; + } else { + ri.extra_params += "&id_key=message_id"; + } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); *response = std::move(output.string_output); @@ -390,6 +395,24 @@ Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData data); } +Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) { + if ( ordered ) { + return GetMessageFromServer(GetMessageServerOperation::GetNext, + 0, + std::move(group_id), + std::move(stream), + info, + data); + } else { + return GetMessageFromServer(GetMessageServerOperation::GetNextAvailable, + 0, + std::move(group_id), + std::move(stream), + info, + data); + } +} + Error ConsumerImpl::GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) { return GetMessageFromServer(GetMessageServerOperation::GetLastInGroup, 0, @@ -413,6 +436,8 @@ std::string ConsumerImpl::OpToUriCmd(GetMessageServerOperation op) { switch (op) { case GetMessageServerOperation::GetNext: return "next"; + case GetMessageServerOperation::GetNextAvailable: + return "next"; case GetMessageServerOperation::GetLastInGroup: return "groupedlast"; case GetMessageServerOperation::GetLast: diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index d1672ea1e..f8556e58c 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -13,6 +13,7 @@ namespace asapo { enum class GetMessageServerOperation { GetNext, + GetNextAvailable, GetLast, GetID, GetLastInGroup, @@ -74,6 +75,7 @@ class ConsumerImpl final : public asapo::Consumer { Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) override; Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; + Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) override; Error GetLast(MessageMeta* info, MessageData* data, std::string stream) override; Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 05e32cf8b..9558fc785 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -72,7 +72,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: void ForceNoRdma() Error DisableMonitoring(bool enabled) NetworkConnectionType CurrentConnectionType() - Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream) + Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream, bool ordered) Error GetLast(MessageMeta* info, MessageData* data, string stream) Error GetLast(string group_id, MessageMeta* info, MessageData* data, string stream) Error GetById(uint64_t id, MessageMeta* info, MessageData* data, string stream) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 3ba383a61..6c5ceb367 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -134,7 +134,10 @@ cdef class PyConsumer: cdef np.npy_intp dims[1] if op == "next": with nogil: - err = self.c_consumer.get().GetNext(b_group_id, &info, p_data,b_stream) + err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, False) + elif op == "next_available": + with nogil: + err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, True) elif op == "last" and group_id == "": with nogil: err = self.c_consumer.get().GetLast(&info, p_data, b_stream) @@ -155,8 +158,11 @@ cdef class PyConsumer: arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) PyArray_ENABLEFLAGS(arr,np.NPY_ARRAY_OWNDATA) return arr,meta - def get_next(self, group_id, meta_only = True, stream = "default"): - return self._op("next",group_id,stream,meta_only,0) + def get_next(self, group_id, meta_only = True, stream = "default", ordered = True): + if ordered: + return self._op("next",group_id,stream,meta_only,0) + else: + return self._op("next_available",group_id,stream,meta_only,0) def get_last(self, meta_only = True, stream = "default", group_id = ""): return self._op("last",group_id,stream,meta_only,0) def get_by_id(self,uint64_t id,meta_only = True, stream = "default"): -- GitLab