diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 03b48601e4a9c2f0143697249cff6a6d91a12070..fbc6bebec3fc76fab9eb608be238f15f73578afa 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -149,7 +149,7 @@ class Consumer { */ virtual std::string GetStreamMeta(const std::string& stream, Error* err) = 0; - //! Receive next available message. + //! Receive next by ID message. /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. \param group_id - group id to use @@ -158,6 +158,7 @@ class Consumer { \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0; + virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) = 0; //! Retrieves message using message meta. /*! diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 8710b5417b87bd7806a1481b93ef6fb2a2c9ae87..1f04b8036f9b735ad788fa7e13fa92789be62064 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -346,6 +346,11 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_ms=" + std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_); } + if (op == GetMessageServerOperation::GetNextAvailable) { + ri.extra_params += "&id_key=_id"; + } else { + ri.extra_params += "&id_key=message_id"; + } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); *response = std::move(output.string_output); @@ -390,6 +395,24 @@ Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData data); } +Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) { + if ( ordered ) { + return GetMessageFromServer(GetMessageServerOperation::GetNext, + 0, + std::move(group_id), + std::move(stream), + info, + data); + } else { + return GetMessageFromServer(GetMessageServerOperation::GetNextAvailable, + 0, + std::move(group_id), + std::move(stream), + info, + data); + } +} + Error ConsumerImpl::GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) { return GetMessageFromServer(GetMessageServerOperation::GetLastInGroup, 0, @@ -413,6 +436,8 @@ std::string ConsumerImpl::OpToUriCmd(GetMessageServerOperation op) { switch (op) { case GetMessageServerOperation::GetNext: return "next"; + case GetMessageServerOperation::GetNextAvailable: + return "next"; case GetMessageServerOperation::GetLastInGroup: return "groupedlast"; case GetMessageServerOperation::GetLast: diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index d1672ea1edcecede0470d4ffb93b443d4efa9b2b..f8556e58c3799f8af9fcde5a48feb937d7d423c2 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -13,6 +13,7 @@ namespace asapo { enum class GetMessageServerOperation { GetNext, + GetNextAvailable, GetLast, GetID, GetLastInGroup, @@ -74,6 +75,7 @@ class ConsumerImpl final : public asapo::Consumer { Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) override; Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; + Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) override; Error GetLast(MessageMeta* info, MessageData* data, std::string stream) override; Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 05e32cf8beeaa82f58002979a657c0b257391938..9558fc785475727981f19ef96d151771a8fa6ece 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -72,7 +72,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: void ForceNoRdma() Error DisableMonitoring(bool enabled) NetworkConnectionType CurrentConnectionType() - Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream) + Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream, bool ordered) Error GetLast(MessageMeta* info, MessageData* data, string stream) Error GetLast(string group_id, MessageMeta* info, MessageData* data, string stream) Error GetById(uint64_t id, MessageMeta* info, MessageData* data, string stream) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 3ba383a617c7f7d80158cc26e09f34a41527315f..6c5ceb36766eb20d71f85734b32bfcd0e64b28ce 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -134,7 +134,10 @@ cdef class PyConsumer: cdef np.npy_intp dims[1] if op == "next": with nogil: - err = self.c_consumer.get().GetNext(b_group_id, &info, p_data,b_stream) + err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, False) + elif op == "next_available": + with nogil: + err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, True) elif op == "last" and group_id == "": with nogil: err = self.c_consumer.get().GetLast(&info, p_data, b_stream) @@ -155,8 +158,11 @@ cdef class PyConsumer: arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) PyArray_ENABLEFLAGS(arr,np.NPY_ARRAY_OWNDATA) return arr,meta - def get_next(self, group_id, meta_only = True, stream = "default"): - return self._op("next",group_id,stream,meta_only,0) + def get_next(self, group_id, meta_only = True, stream = "default", ordered = True): + if ordered: + return self._op("next",group_id,stream,meta_only,0) + else: + return self._op("next_available",group_id,stream,meta_only,0) def get_last(self, meta_only = True, stream = "default", group_id = ""): return self._op("last",group_id,stream,meta_only,0) def get_by_id(self,uint64_t id,meta_only = True, stream = "default"):