From f0dbb4c834a451fe77edd888471d8540192c5577 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 23 Dec 2020 11:03:58 +0100 Subject: [PATCH] change ack functions --- CHANGELOG.md | 11 +++--- .../api/cpp/include/asapo/consumer/consumer.h | 36 +++++++++---------- consumer/api/cpp/src/consumer_impl.cpp | 12 +++---- consumer/api/cpp/src/consumer_impl.h | 8 ++--- .../api/cpp/unittests/test_consumer_impl.cpp | 6 ++-- consumer/api/python/asapo_consumer.pxd | 4 +-- consumer/api/python/asapo_consumer.pyx.in | 8 ++--- producer/api/cpp/src/producer_impl.cpp | 2 +- .../consumer/consumer_api/consumer_api.cpp | 10 +++--- .../consumer_api_python/consumer_api.py | 12 +++---- 10 files changed, 56 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e33750ece..ff5c2a4fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,9 @@ BREAKING CHANGES #### renaming - Consumer API * broker -> consumer * SetLastReadMarker/set_lastread_marker -> swap arguments +* GetUnacknowledgedTupleIds/get_unacknowledged_tuple_ids -> GetUnacknowledgedMessages/get_unacknowledged_messages +* GetLastAcknowledgedTulpeId/get_last_acknowledged_tuple_id -> GetLastAcknowledgedMessage/get_last_acknowledged_message + BUG FIXES * fix memory leak bug in Python consumer library (lead to problems when creating many consumer instances) @@ -37,8 +40,8 @@ FEATURES * New function GetLastStream/last_stream in Producer API - returns info for a stream which was created last IMPROVEMENTS -* Each data tuple automatically gets a timestamp (nanoseconds from Linux epoch) at the moment it is being inserted to a database -* GetStreamList/get_stream_list returns now sorted (by timestamp of the earliest data tuple) list of streams. Parameter `from` allows to limit the list +* Each message automatically gets a timestamp (nanoseconds from Linux epoch) at the moment it is being inserted to a database +* GetStreamList/get_stream_list returns now sorted (by timestamp of the earliest message) list of streams. Parameter `from` allows to limit the list BREAKING CHANGES * GetStreamList/get_stream_list returns now not an array of strings, but array of StreamInfos/dictionaries @@ -81,7 +84,7 @@ IMPROVEMENTS ## 20.06.0 FEATURES -* implemented acknowledeges - one can acknowledge a data tuple, get last acknowledged tuple id, get list of unacknowledged tuple ids +* implemented acknowledeges - one can acknowledge a message, get last acknowledged tuple id, get list of unacknowledged tuple ids * implement getting stream info (contains last id) by producer client (not need to have consumer client) IMPROVEMENTS @@ -91,7 +94,7 @@ IMPROVEMENTS * added simple C++ examples BUG FIXES -* check data tuple ids should be positive +* check message ids should be positive ## 20.03.0 FEATURES diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 6cdc6f63f..8af2ddfac 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -25,20 +25,20 @@ class Consumer { virtual Error SetLastReadMarker(std::string group_id, uint64_t value) = 0; virtual Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) = 0; - //! Acknowledge data tuple for specific group id and stream. + //! Acknowledge message for specific group id and stream. /*! \param group_id - group id to use. - \param id - data tuple id + \param id - message id \param stream (optional) - stream \return nullptr of command was successful, otherwise error. */ virtual Error Acknowledge(std::string group_id, uint64_t id, std::string stream = kDefaultStream) = 0; - //! Negative acknowledge data tuple for specific group id and stream. + //! Negative acknowledge message for specific group id and stream. /*! \param group_id - group id to use. - \param id - data tuple id - \param delay_ms - data tuple will be redelivered after delay, 0 to redeliver immediately + \param id - message id + \param delay_ms - message will be redelivered after delay, 0 to redeliver immediately \param stream (optional) - stream \return nullptr of command was successful, otherwise error. */ @@ -46,19 +46,19 @@ class Consumer { std::string stream = kDefaultStream) = 0; - //! Get unacknowledged tuple for specific group id and stream. + //! Get unacknowledged messages for specific group id and stream. /*! \param group_id - group id to use. \param stream (optional) - stream - \param from_id - return tuples with ids greater or equal to from (use 0 disable limit) - \param to_id - return tuples with ids less or equal to to (use 0 to disable limit) + \param from_id - return messages with ids greater or equal to from (use 0 disable limit) + \param to_id - return messages with ids less or equal to to (use 0 to disable limit) \param in (optional) - stream \param err - set to nullptr of operation succeed, error otherwise. \return vector of ids, might be empty */ - virtual IdList GetUnacknowledgedTupleIds(std::string group_id, std::string stream, uint64_t from_id, uint64_t to_id, + virtual IdList GetUnacknowledgedMessages(std::string group_id, std::string stream, uint64_t from_id, uint64_t to_id, Error* error) = 0; - virtual IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) = 0; + virtual IdList GetUnacknowledgedMessages(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) = 0; //! Set timeout for consumer operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; @@ -123,17 +123,17 @@ class Consumer { \param err - will be set to error data cannot be read, nullptr otherwise. \param group_id - group id to use. \param stream - stream to use ("" for default). - \param min_size - wait until dataset has min_size data tuples (0 for maximum size) + \param min_size - wait until dataset has min_size messages (0 for maximum size) \return DataSet - information about the dataset */ virtual DataSet GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) = 0; virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, Error* err) = 0; - //! Receive last available dataset which has min_size data tuples. + //! Receive last available dataset which has min_size messages. /*! \param err - will be set to error data cannot be read, nullptr otherwise. \param stream - stream to use ("" for default). - \param min_size - amount of data tuples in dataset (0 for maximum size) + \param min_size - amount of messages in dataset (0 for maximum size) \return DataSet - information about the dataset */ virtual DataSet GetLastDataset(std::string stream, uint64_t min_size, Error* err) = 0; @@ -144,7 +144,7 @@ class Consumer { \param id - dataset id \param err - will be set to error data cannot be read or dataset size less than min_size, nullptr otherwise. \param stream - stream to use ("" for default). - \param min_size - wait until dataset has min_size data tuples (0 for maximum size) + \param min_size - wait until dataset has min_size messages (0 for maximum size) \return DataSet - information about the dataset */ virtual DataSet GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) = 0; @@ -160,15 +160,15 @@ class Consumer { virtual Error GetById(uint64_t id, MessageMeta* info, MessageData* data) = 0; virtual Error GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) = 0; - //! Receive id of last acknowledged data tuple + //! Receive id of last acknowledged message /*! \param group_id - group id to use. \param stream (optional) - stream \param err - will be set in case of error, nullptr otherwise. \return id of the last acknowledged message, 0 if error */ - virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string stream, Error* error) = 0; - virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) = 0; + virtual uint64_t GetLastAcknowledgedMessage(std::string group_id, std::string stream, Error* error) = 0; + virtual uint64_t GetLastAcknowledgedMessage(std::string group_id, Error* error) = 0; //! Receive last available message. /*! @@ -188,7 +188,7 @@ class Consumer { virtual MessageMetas QueryMessages(std::string query, Error* err) = 0; virtual MessageMetas QueryMessages(std::string query, std::string stream, Error* err) = 0; - //! Configure resending nonacknowledged data + //! Configure resending unacknowledged data /*! \param resend - where to resend \param delay_ms - how many milliseconds to wait before resending diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index acc53f6bb..a61911c80 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -773,7 +773,7 @@ Error ConsumerImpl::Acknowledge(std::string group_id, uint64_t id, std::string s return err; } -IdList ConsumerImpl::GetUnacknowledgedTupleIds(std::string group_id, +IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, std::string stream, uint64_t from_id, uint64_t to_id, @@ -798,14 +798,14 @@ IdList ConsumerImpl::GetUnacknowledgedTupleIds(std::string group_id, return list; } -IdList ConsumerImpl::GetUnacknowledgedTupleIds(std::string group_id, +IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) { - return GetUnacknowledgedTupleIds(std::move(group_id), kDefaultStream, from_id, to_id, error); + return GetUnacknowledgedMessages(std::move(group_id), kDefaultStream, from_id, to_id, error); } -uint64_t ConsumerImpl::GetLastAcknowledgedTulpeId(std::string group_id, std::string stream, Error* error) { +uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, std::string stream, Error* error) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + @@ -828,8 +828,8 @@ uint64_t ConsumerImpl::GetLastAcknowledgedTulpeId(std::string group_id, std::str return id; } -uint64_t ConsumerImpl::GetLastAcknowledgedTulpeId(std::string group_id, Error* error) { - return GetLastAcknowledgedTulpeId(std::move(group_id), kDefaultStream, error); +uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, Error* error) { + return GetLastAcknowledgedMessage(std::move(group_id), kDefaultStream, error); } void ConsumerImpl::SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) { diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 494b50c07..0193178c3 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -60,15 +60,15 @@ class ConsumerImpl final : public asapo::Consumer { Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, std::string stream = kDefaultStream) override; - IdList GetUnacknowledgedTupleIds(std::string group_id, + IdList GetUnacknowledgedMessages(std::string group_id, std::string stream, uint64_t from_id, uint64_t to_id, Error* error) override; - IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) override; + IdList GetUnacknowledgedMessages(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) override; - uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string stream, Error* error) override; - uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) override; + uint64_t GetLastAcknowledgedMessage(std::string group_id, std::string stream, Error* error) override; + uint64_t GetLastAcknowledgedMessage(std::string group_id, Error* error) override; Error ResetLastReadMarker(std::string group_id) override; Error ResetLastReadMarker(std::string group_id, std::string stream) override; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 9675316ea..c65bee5ac 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1294,7 +1294,7 @@ void ConsumerImplTests::ExpectIdList(bool error) { TEST_F(ConsumerImplTests, GetUnAcknowledgedListReturnsIds) { ExpectIdList(false); asapo::Error err; - auto list = consumer->GetUnacknowledgedTupleIds(expected_group_id, expected_stream, 1, 0, &err); + auto list = consumer->GetUnacknowledgedMessages(expected_group_id, expected_stream, 1, 0, &err); ASSERT_THAT(list, ElementsAre(1, 2, 3)); ASSERT_THAT(err, Eq(nullptr)); @@ -1314,7 +1314,7 @@ TEST_F(ConsumerImplTests, GetLastAcknowledgeUsesOk) { ExpectLastAckId(false); asapo::Error err; - auto ind = consumer->GetLastAcknowledgedTulpeId(expected_group_id, expected_stream, &err); + auto ind = consumer->GetLastAcknowledgedMessage(expected_group_id, expected_stream, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(ind, Eq(1)); } @@ -1324,7 +1324,7 @@ TEST_F(ConsumerImplTests, GetLastAcknowledgeReturnsNoData) { ExpectLastAckId(true); asapo::Error err; - auto ind = consumer->GetLastAcknowledgedTulpeId(expected_group_id, expected_stream, &err); + auto ind = consumer->GetLastAcknowledgedMessage(expected_group_id, expected_stream, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); ASSERT_THAT(ind, Eq(0)); } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 5df0d3528..0aac9634a 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -70,8 +70,8 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: Error ResetLastReadMarker(string group_id, string stream) Error Acknowledge(string group_id, uint64_t id, string stream) Error NegativeAcknowledge(string group_id, uint64_t id, uint64_t delay_ms, string stream) - uint64_t GetLastAcknowledgedTulpeId(string group_id, string stream, Error* error) - IdList GetUnacknowledgedTupleIds(string group_id, string stream, uint64_t from_id, uint64_t to_id, Error* error) + uint64_t GetLastAcknowledgedMessage(string group_id, string stream, Error* error) + IdList GetUnacknowledgedMessages(string group_id, string stream, uint64_t from_id, uint64_t to_id, Error* error) string GenerateNewGroupId(Error* err) string GetBeamtimeMeta(Error* err) MessageMetas QueryMessages(string query, string stream, Error* err) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 15420fd15..d3e0ea51d 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -241,24 +241,24 @@ cdef class PyConsumer: with nogil: self.c_consumer.get().SetResendNacs(resend,delay_ms,resend_attempts) - def get_last_acknowledged_tuple_id(self, group_id, stream = "default"): + def get_last_acknowledged_message(self, group_id, stream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream) cdef Error err cdef uint64_t id with nogil: - id = self.c_consumer.get().GetLastAcknowledgedTulpeId(b_group_id,b_stream,&err) + id = self.c_consumer.get().GetLastAcknowledgedMessage(b_group_id,b_stream,&err) if err: throw_exception(err) return id - def get_unacknowledged_tuple_ids(self, group_id, stream = "default", uint64_t from_id = 0, uint64_t to_id = 0): + def get_unacknowledged_messages(self, group_id, stream = "default", uint64_t from_id = 0, uint64_t to_id = 0): cdef Error err cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream) cdef IdList ids with nogil: - ids = self.c_consumer.get().GetUnacknowledgedTupleIds(b_group_id, b_stream, from_id, to_id, &err) + ids = self.c_consumer.get().GetUnacknowledgedMessages(b_group_id, b_stream, from_id, to_id, &err) if err: throw_exception(err) list = [] diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 76bf9b4b2..6fc10904f 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -85,7 +85,7 @@ Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_ } if (message_header.message_id == 0) { - return ProducerErrorTemplates::kWrongInput.Generate("data tuple id should be positive"); + return ProducerErrorTemplates::kWrongInput.Generate("message id should be positive"); } return CheckIngestMode(ingest_mode); diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index e49455535..0cd7c3e4d 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -148,28 +148,28 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[2].timestamp_created) == 2000, "streams2.timestamp"); // acknowledges - auto id = consumer->GetLastAcknowledgedTulpeId(group_id, &err); + auto id = consumer->GetLastAcknowledgedMessage(group_id, &err); M_AssertTrue(err == asapo::ConsumerErrorTemplates::kNoData, "last ack default stream no data"); M_AssertTrue(id == 0, "last ack default stream no data id = 0"); - auto nacks = consumer->GetUnacknowledgedTupleIds(group_id, 0, 0, &err); + auto nacks = consumer->GetUnacknowledgedMessages(group_id, 0, 0, &err); M_AssertTrue(err == nullptr, "nacks default stream all"); M_AssertTrue(nacks.size() == 10, "nacks default stream size = 10"); err = consumer->Acknowledge(group_id, 1); M_AssertTrue(err == nullptr, "ack default stream no error"); - nacks = consumer->GetUnacknowledgedTupleIds(group_id, 0, 0, &err); + nacks = consumer->GetUnacknowledgedMessages(group_id, 0, 0, &err); M_AssertTrue(nacks.size() == 9, "nacks default stream size = 9 after ack"); - id = consumer->GetLastAcknowledgedTulpeId(group_id, &err); + id = consumer->GetLastAcknowledgedMessage(group_id, &err); M_AssertTrue(err == nullptr, "last ack default stream no error"); M_AssertTrue(id == 1, "last ack default stream id = 1"); err = consumer->Acknowledge(group_id, 1, "stream1"); M_AssertTrue(err == nullptr, "ack stream1 no error"); - nacks = consumer->GetUnacknowledgedTupleIds(group_id, "stream1", 0, 0, &err); + nacks = consumer->GetUnacknowledgedMessages(group_id, "stream1", 0, 0, &err); M_AssertTrue(nacks.size() == 4, "nacks stream1 size = 4 after ack"); // negative acks diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 6a454eced..fee6ba776 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -132,26 +132,26 @@ def check_single(consumer, group_id): # acks try: - id = consumer.get_last_acknowledged_tuple_id(group_id) + id = consumer.get_last_acknowledged_message(group_id) except asapo_consumer.AsapoNoDataError as err: print(err) pass else: - exit_on_noerr("get_last_acknowledged_tuple_id") + exit_on_noerr("get_last_acknowledged_message") - nacks = consumer.get_unacknowledged_tuple_ids(group_id) + nacks = consumer.get_unacknowledged_messages(group_id) assert_eq(len(nacks), 5, "nacks default stream size = 5") consumer.acknowledge(group_id, 1) - nacks = consumer.get_unacknowledged_tuple_ids(group_id) + nacks = consumer.get_unacknowledged_messages(group_id) assert_eq(len(nacks), 4, "nacks default stream size = 4") - id = consumer.get_last_acknowledged_tuple_id(group_id) + id = consumer.get_last_acknowledged_message(group_id) assert_eq(id, 1, "last ack default stream id = 1") consumer.acknowledge(group_id, 1, "stream1") - nacks = consumer.get_unacknowledged_tuple_ids(group_id) + nacks = consumer.get_unacknowledged_messages(group_id) assert_eq(len(nacks), 4, "nacks stream1 size = 4 after ack") # neg acks -- GitLab