Skip to content
Snippets Groups Projects
Commit e4deee51 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add optional argument `ordered` to consumer function get_next. If true...

Add optional argument `ordered` to consumer function get_next. If true messages will be ordered by message_id otherwise by _id.
parent 7cc30f45
No related branches found
No related tags found
No related merge requests found
......@@ -149,7 +149,7 @@ class Consumer {
*/
virtual std::string GetStreamMeta(const std::string& stream, Error* err) = 0;
//! Receive next available message.
//! Receive next by ID message.
/*!
\param info - where to store message metadata. Can be set to nullptr only message data is needed.
\param group_id - group id to use
......@@ -158,6 +158,7 @@ class Consumer {
\return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0;
virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) = 0;
//! Retrieves message using message meta.
/*!
......
......@@ -346,6 +346,11 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group
ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_ms=" +
std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_);
}
if (op == GetMessageServerOperation::GetNextAvailable) {
ri.extra_params += "&id_key=_id";
} else {
ri.extra_params += "&id_key=message_id";
}
RequestOutput output;
err = ProcessRequest(&output, ri, &current_broker_uri_);
*response = std::move(output.string_output);
......@@ -390,6 +395,24 @@ Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData
data);
}
Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) {
if ( ordered ) {
return GetMessageFromServer(GetMessageServerOperation::GetNext,
0,
std::move(group_id),
std::move(stream),
info,
data);
} else {
return GetMessageFromServer(GetMessageServerOperation::GetNextAvailable,
0,
std::move(group_id),
std::move(stream),
info,
data);
}
}
Error ConsumerImpl::GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) {
return GetMessageFromServer(GetMessageServerOperation::GetLastInGroup,
0,
......@@ -413,6 +436,8 @@ std::string ConsumerImpl::OpToUriCmd(GetMessageServerOperation op) {
switch (op) {
case GetMessageServerOperation::GetNext:
return "next";
case GetMessageServerOperation::GetNextAvailable:
return "next";
case GetMessageServerOperation::GetLastInGroup:
return "groupedlast";
case GetMessageServerOperation::GetLast:
......
......@@ -13,6 +13,7 @@ namespace asapo {
enum class GetMessageServerOperation {
GetNext,
GetNextAvailable,
GetLast,
GetID,
GetLastInGroup,
......@@ -74,6 +75,7 @@ class ConsumerImpl final : public asapo::Consumer {
Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) override;
Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override;
Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) override;
Error GetLast(MessageMeta* info, MessageData* data, std::string stream) override;
Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override;
......
......@@ -72,7 +72,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
void ForceNoRdma()
Error DisableMonitoring(bool enabled)
NetworkConnectionType CurrentConnectionType()
Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream)
Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream, bool ordered)
Error GetLast(MessageMeta* info, MessageData* data, string stream)
Error GetLast(string group_id, MessageMeta* info, MessageData* data, string stream)
Error GetById(uint64_t id, MessageMeta* info, MessageData* data, string stream)
......
......@@ -134,7 +134,10 @@ cdef class PyConsumer:
cdef np.npy_intp dims[1]
if op == "next":
with nogil:
err = self.c_consumer.get().GetNext(b_group_id, &info, p_data,b_stream)
err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, False)
elif op == "next_available":
with nogil:
err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, True)
elif op == "last" and group_id == "":
with nogil:
err = self.c_consumer.get().GetLast(&info, p_data, b_stream)
......@@ -155,8 +158,11 @@ cdef class PyConsumer:
arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr)
PyArray_ENABLEFLAGS(arr,np.NPY_ARRAY_OWNDATA)
return arr,meta
def get_next(self, group_id, meta_only = True, stream = "default"):
return self._op("next",group_id,stream,meta_only,0)
def get_next(self, group_id, meta_only = True, stream = "default", ordered = True):
if ordered:
return self._op("next",group_id,stream,meta_only,0)
else:
return self._op("next_available",group_id,stream,meta_only,0)
def get_last(self, meta_only = True, stream = "default", group_id = ""):
return self._op("last",group_id,stream,meta_only,0)
def get_by_id(self,uint64_t id,meta_only = True, stream = "default"):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment