From 2e099a15e6ab8c4dab5e602e7abd88744e39722e Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Fri, 20 Sep 2019 13:01:34 +0200
Subject: [PATCH] modified and renamed reset_counter, get_by_id does not reset
 counter anymore

---
 broker/src/asapo_broker/database/mongodb.go   | 10 +++--
 .../src/asapo_broker/database/mongodb_test.go |  5 +--
 broker/src/asapo_broker/server/listroutes.go  |  2 +-
 .../asapo_broker/server/post_reset_counter.go | 11 +++++-
 .../server/post_reset_counter_test.go         |  4 +-
 .../src/asapo_broker/server/request_common.go |  3 --
 .../worker/worker_api/worker_api.cpp          | 22 +++++++++--
 .../worker/worker_api_python/worker_api.py    | 17 ++++++--
 worker/api/cpp/include/worker/data_broker.h   |  3 +-
 worker/api/cpp/src/folder_data_broker.cpp     |  8 +++-
 worker/api/cpp/src/folder_data_broker.h       |  3 +-
 worker/api/cpp/src/server_data_broker.cpp     | 13 +++++--
 worker/api/cpp/src/server_data_broker.h       |  3 +-
 .../api/cpp/unittests/test_folder_broker.cpp  |  2 +-
 .../api/cpp/unittests/test_server_broker.cpp  | 39 ++++++++++++++++---
 worker/api/python/asapo_worker.pxd            |  3 +-
 worker/api/python/asapo_worker.pyx.in         | 15 ++++++-
 17 files changed, 124 insertions(+), 39 deletions(-)

diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 64dc01e2e..934652125 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -344,13 +344,17 @@ func (db *Mongodb) GetSize(db_name string) ([]byte, error) {
 	return json.Marshal(&rec)
 }
 
-func (db *Mongodb) ResetCounter(db_name string, group_id string) ([]byte, error) {
+func (db *Mongodb) ResetCounter(db_name string, group_id string, id_str string) ([]byte, error) {
+	id, err := strconv.Atoi(id_str)
+	if err != nil {
+		return nil, err
+	}
 
 	if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil {
 		return nil, err
 	}
 
-	err := db.setCounter(db_name, group_id, 0)
+	err = db.setCounter(db_name, group_id, id)
 
 	return []byte(""), err
 }
@@ -422,7 +426,7 @@ func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, ex
 	case "last":
 		return db.GetLastRecord(db_name, group_id, dataset)
 	case "resetcounter":
-		return db.ResetCounter(db_name, group_id)
+		return db.ResetCounter(db_name, group_id, extra_param)
 	case "size":
 		return db.GetSize(db_name)
 	case "meta":
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index 9038f6e28..5b9bf9e20 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -308,14 +308,13 @@ func TestMongoDBResetCounter(t *testing.T) {
 	assert.Nil(t, err1)
 	assert.Equal(t, string(rec1_expect), string(res1))
 
-	_, err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", "0")
+	_, err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", "1")
 	assert.Nil(t, err_reset)
 
 	res2, err2 := db.ProcessRequest(dbname, groupId, "next", "0")
 
 	assert.Nil(t, err2)
-	assert.Equal(t, string(rec1_expect), string(res2))
-
+	assert.Equal(t, string(rec2_expect), string(res2))
 }
 
 func TestMongoDBGetMetaOK(t *testing.T) {
diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go
index 6925d89e6..76595e407 100644
--- a/broker/src/asapo_broker/server/listroutes.go
+++ b/broker/src/asapo_broker/server/listroutes.go
@@ -48,7 +48,7 @@ var listRoutes = utils.Routes{
 		routeQueryImages,
 	},
 	utils.Route{
-		"ResetCounter",
+		"ResetConter",
 		"Post",
 		"/database/{dbname}/{stream}/{groupid}/resetcounter",
 		routeResetCounter,
diff --git a/broker/src/asapo_broker/server/post_reset_counter.go b/broker/src/asapo_broker/server/post_reset_counter.go
index fd881f72f..b67934d4e 100644
--- a/broker/src/asapo_broker/server/post_reset_counter.go
+++ b/broker/src/asapo_broker/server/post_reset_counter.go
@@ -4,6 +4,15 @@ import (
 	"net/http"
 )
 
+func extractRequestParametersValue(r *http.Request) string {
+	val := r.URL.Query().Get("value")
+	if len(val) == 0 {
+		return "0"
+	}
+	return val
+}
+
 func routeResetCounter(w http.ResponseWriter, r *http.Request) {
-	processRequest(w, r, "resetcounter", "0", true)
+	val := extractRequestParametersValue(r)
+	processRequest(w, r, "resetcounter", val, true)
 }
diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go
index 4cff3551b..e0b67f29a 100644
--- a/broker/src/asapo_broker/server/post_reset_counter_test.go
+++ b/broker/src/asapo_broker/server/post_reset_counter_test.go
@@ -33,10 +33,10 @@ func TestResetCounterTestSuite(t *testing.T) {
 }
 
 func (suite *ResetCounterTestSuite) TestResetCounterOK() {
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "0").Return([]byte(""), nil)
+	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter")))
 	ExpectCopyClose(suite.mock_db)
 
-	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix, "POST")
+	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST")
 	suite.Equal(http.StatusOK, w.Code, "ResetCounter OK")
 }
diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go
index d53a67d59..2790ce453 100644
--- a/broker/src/asapo_broker/server/request_common.go
+++ b/broker/src/asapo_broker/server/request_common.go
@@ -15,7 +15,6 @@ func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string,
 
 func ValueTrue(r *http.Request, key string) bool {
 	val := r.URL.Query().Get(key)
-
 	if len(val) == 0 {
 		return false
 	}
@@ -23,9 +22,7 @@ func ValueTrue(r *http.Request, key string) bool {
 	if val == "true" {
 		return true
 	}
-
 	return false
-
 }
 
 func resetRequested(r *http.Request) bool {
diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp
index 82683d07b..7b42092ff 100644
--- a/tests/automatic/worker/worker_api/worker_api.cpp
+++ b/tests/automatic/worker/worker_api/worker_api.cpp
@@ -56,13 +56,27 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str
     err = broker->GetNext(&fi, group_id, nullptr);
     M_AssertTrue(err != nullptr, "GetNext2 error");
 
-    err = broker->GetLast(&fi, group_id, nullptr);
-    M_AssertTrue(err == nullptr, "GetLast2 no error");
+    err = broker->SetLastReadMarker(2, group_id);
+    M_AssertTrue(err == nullptr, "SetLastReadMarker no error");
+
 
     err = broker->GetById(8, &fi, group_id, nullptr);
     M_AssertTrue(err == nullptr, "GetById error");
     M_AssertTrue(fi.name == "8", "GetById filename");
 
+    err = broker->GetNext(&fi, group_id, nullptr);
+    M_AssertTrue(err == nullptr, "GetNext After GetById  no error");
+    M_AssertTrue(fi.name == "3", "GetNext After GetById filename");
+
+
+    err = broker->GetLast(&fi, group_id, nullptr);
+    M_AssertTrue(err == nullptr, "GetLast2 no error");
+
+
+    err = broker->SetLastReadMarker(8, group_id);
+    M_AssertTrue(err == nullptr, "SetLastReadMarker 2 no error");
+
+
     err = broker->GetNext(&fi, group_id, nullptr);
     M_AssertTrue(err == nullptr, "GetNext3 no error");
     M_AssertTrue(fi.name == "9", "GetNext3 filename");
@@ -71,8 +85,8 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str
     M_AssertTrue(err == nullptr, "GetNDataSets no error");
     M_AssertTrue(size == 10, "GetNDataSets size");
 
-    err = broker->ResetCounter(group_id);
-    M_AssertTrue(err == nullptr, "ResetCounter");
+    err = broker->ResetLastReadMarker(group_id);
+    M_AssertTrue(err == nullptr, "SetLastReadMarker");
 
     err = broker->GetNext(&fi, group_id, nullptr);
     M_AssertTrue(err == nullptr, "GetNext4 no error");
diff --git a/tests/automatic/worker/worker_api_python/worker_api.py b/tests/automatic/worker/worker_api_python/worker_api.py
index c7086e9b3..9badee467 100644
--- a/tests/automatic/worker/worker_api_python/worker_api.py
+++ b/tests/automatic/worker/worker_api_python/worker_api.py
@@ -63,8 +63,8 @@ def check_single(broker,group_id_new):
     assert_eq(size,5,"get_ndatasets")
 
 
-    err = broker.reset_counter(group_id_new)
-    assert_noterr(err, "reset_counter")
+    err = broker.reset_lastread_marker(group_id_new)
+    assert_noterr(err, "reset_lastread_marker")
 
     _, meta, err = broker.get_next(group_id_new, meta_only=True)
     assert_noterr(err, "get_next4")
@@ -79,10 +79,19 @@ def check_single(broker,group_id_new):
 
     _, meta, err = broker.get_next(group_id_new, meta_only=True)
     assert_noterr(err, "get_next5")
-    assert_metaname(meta,"4","get next5")
+    assert_metaname(meta,"2","get next5")
     assert_usermetadata(meta,"get next5")
 
 
+    err = broker.set_lastread_marker(4, group_id_new)
+    assert_noterr(err, "set_lastread_marker")
+
+    _, meta, err = broker.get_next(group_id_new, meta_only=True)
+    assert_noterr(err, "get_next6")
+    assert_metaname(meta,"5","get next6")
+    assert_usermetadata(meta,"get next6")
+
+
     images,err = broker.query_images("meta.test = 10")
     assert_noterr(err, "query1")
     assert_eq(len(images),5,"size of query answer 1")
@@ -138,7 +147,7 @@ def check_dataset(broker,group_id_new):
     assert_metaname(metas[2],"8_3","get get_dataset_by_id1 name3")
 
     id, metas, err = broker.get_next_dataset(group_id_new)
-    assert_eq(id,9,"get_next_dataset4 id")
+    assert_eq(id,None,"get_next_dataset4 id")
 
 
 source, path, beamtime, token, mode = sys.argv[1:]
diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h
index 9629b3389..e82086357 100644
--- a/worker/api/cpp/include/worker/data_broker.h
+++ b/worker/api/cpp/include/worker/data_broker.h
@@ -19,7 +19,8 @@ class DataBroker {
       \param group_id - group id to use.
       \return nullptr of command was successful, otherwise error.
     */
-    virtual Error ResetCounter(std::string group_id) = 0;
+    virtual Error ResetLastReadMarker(std::string group_id) = 0;
+    virtual Error SetLastReadMarker(uint64_t value, std::string group_id) = 0;
 
     //! Set timeout for broker operations. Default - no timeout
     virtual void SetTimeout(uint64_t timeout_ms) = 0;
diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp
index cdc0fb1fa..24f574a7b 100644
--- a/worker/api/cpp/src/folder_data_broker.cpp
+++ b/worker/api/cpp/src/folder_data_broker.cpp
@@ -90,7 +90,7 @@ std::string FolderDataBroker::GenerateNewGroupId(Error* err) {
     *err = nullptr;
     return "";
 }
-Error FolderDataBroker::ResetCounter(std::string group_id) {
+Error FolderDataBroker::ResetLastReadMarker(std::string group_id) {
     std::lock_guard<std::mutex> lock{mutex_};
     current_file_ = -1;
     return nullptr;
@@ -125,6 +125,10 @@ DataSet FolderDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro
     *err = TextError("Not supported for folder data broker");
     return {0, FileInfos{}};
 }
-
+Error FolderDataBroker::SetLastReadMarker(uint64_t value, std::string group_id) {
+    std::lock_guard<std::mutex> lock{mutex_};
+    current_file_ = value - 1;
+    return nullptr;
+}
 
 }
diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h
index 372ef2d51..662800180 100644
--- a/worker/api/cpp/src/folder_data_broker.h
+++ b/worker/api/cpp/src/folder_data_broker.h
@@ -14,7 +14,8 @@ class FolderDataBroker final : public asapo::DataBroker {
   public:
     explicit FolderDataBroker(const std::string& source_name);
     Error Connect() override;
-    Error ResetCounter(std::string group_id) override;
+    Error ResetLastReadMarker(std::string group_id) override;
+    Error SetLastReadMarker(uint64_t value, std::string group_id) override;
     Error GetNext(FileInfo* info, std::string group_id, FileData* data) override;
     Error GetLast(FileInfo* info, std::string group_id, FileData* data) override;
     void SetTimeout(uint64_t timeout_ms) override {}; // to timeout in this case
diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp
index 6f00f5529..4dcc2492a 100644
--- a/worker/api/cpp/src/server_data_broker.cpp
+++ b/worker/api/cpp/src/server_data_broker.cpp
@@ -255,7 +255,7 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro
         if (*err == nullptr) {
             request.host = current_broker_uri_;
             *err = ProcessRequest(&response, request);
-            if (*err == nullptr || (*err)->GetErrorType() == ErrorType::kEndOfFile || (*err) == WorkerErrorTemplates::kWrongInput) {
+            if (*err == nullptr || (*err) == WorkerErrorTemplates::kWrongInput) {
                 return response;
             }
         }
@@ -266,10 +266,12 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro
     return "";
 }
 
-Error ServerDataBroker::ResetCounter(std::string group_id) {
+
+Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(
                  group_id) + "/resetcounter";
+    ri.extra_params = "&value=" + std::to_string(value);
     ri.post = true;
 
     Error err;
@@ -277,6 +279,11 @@ Error ServerDataBroker::ResetCounter(std::string group_id) {
     return err;
 }
 
+
+Error ServerDataBroker::ResetLastReadMarker(std::string group_id) {
+    return SetLastReadMarker(0, group_id);
+}
+
 uint64_t ServerDataBroker::GetNDataSets(Error* err) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/size";
@@ -303,7 +310,6 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(
                  group_id) + "/" + std::to_string(id);
-    ri.extra_params = "&reset=true";
     if (dataset) {
         ri.extra_params += "&dataset=true";
     }
@@ -391,5 +397,4 @@ DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro
     return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), err);
 }
 
-
 }
diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h
index e87bb9968..3144e7785 100644
--- a/worker/api/cpp/src/server_data_broker.h
+++ b/worker/api/cpp/src/server_data_broker.h
@@ -29,7 +29,8 @@ class ServerDataBroker final : public asapo::DataBroker {
   public:
     explicit ServerDataBroker(std::string server_uri, std::string source_path, SourceCredentials source);
     Error Connect() override;
-    Error ResetCounter(std::string group_id) override;
+    Error ResetLastReadMarker(std::string group_id) override;
+    Error SetLastReadMarker(uint64_t value, std::string group_id) override;
     Error GetNext(FileInfo* info, std::string group_id, FileData* data) override;
     Error GetLast(FileInfo* info, std::string group_id, FileData* data) override;
     std::string GenerateNewGroupId(Error* err) override;
diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp
index d72d46166..664d34b11 100644
--- a/worker/api/cpp/unittests/test_folder_broker.cpp
+++ b/worker/api/cpp/unittests/test_folder_broker.cpp
@@ -209,7 +209,7 @@ TEST_F(FolderDataBrokerTests, SecondNextReturnsSameFileInfoIfReset) {
     FileInfo fi;
     data_broker->GetNext(&fi, "", nullptr);
 
-    auto err = data_broker->ResetCounter("");
+    auto err = data_broker->ResetLastReadMarker("");
     ASSERT_THAT(err, Eq(nullptr));
 
     err = data_broker->GetNext(&fi, "", nullptr);
diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp
index 092aa793b..514d6eec0 100644
--- a/worker/api/cpp/unittests/test_server_broker.cpp
+++ b/worker/api/cpp/unittests/test_server_broker.cpp
@@ -421,18 +421,31 @@ TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) {
     ASSERT_THAT(groupid, Eq(expected_group_id));
 }
 
+TEST_F(ServerDataBrokerTests, ResetCounterByDefaultUsesCorrectUri) {
+    MockGetBrokerUri();
+    data_broker->SetTimeout(100);
+
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                         expected_group_id +
+                                         "/resetcounter?token=" + expected_token + "&value=0", _, _, _)).WillOnce(DoAll(
+                                                     SetArgPointee<2>(HttpCode::OK),
+                                                     SetArgPointee<3>(nullptr),
+                                                     Return("")));
+    auto err = data_broker->ResetLastReadMarker(expected_group_id);
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
 TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
                                          expected_group_id +
-                                         "/resetcounter?token="
-                                         + expected_token, _, _, _)).WillOnce(DoAll(
+                                         "/resetcounter?token=" + expected_token + "&value=10", _, _, _)).WillOnce(DoAll(
                                                      SetArgPointee<2>(HttpCode::OK),
                                                      SetArgPointee<3>(nullptr),
                                                      Return("")));
-    auto err = data_broker->ResetCounter(expected_group_id);
+    auto err = data_broker->SetLastReadMarker(10, expected_group_id);
     ASSERT_THAT(err, Eq(nullptr));
 }
 
@@ -494,7 +507,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
                                         expected_group_id
                                         + "/" + std::to_string(
                                             expected_dataset_id) + "?token="
-                                        + expected_token + "&reset=true", _,
+                                        + expected_token, _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
                                                 SetArgPointee<2>(nullptr),
@@ -504,7 +517,23 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
 
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(info.name, Eq(to_send.name));
+}
 
+
+TEST_F(ServerDataBrokerTests, GetByIdTimeouts) {
+    MockGetBrokerUri();
+    data_broker->SetTimeout(10);
+
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"  +
+                                        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
+                                        + expected_token, _, _)).WillOnce(DoAll(
+                                                    SetArgPointee<1>(HttpCode::Conflict),
+                                                    SetArgPointee<2>(nullptr),
+                                                    Return("")));
+
+    auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr);
+
+    ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout));
 }
 
 TEST_F(ServerDataBrokerTests, GetMetaDataOK) {
@@ -719,7 +748,7 @@ TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) {
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
                                         expected_group_id +
                                         "/" + std::to_string(expected_dataset_id) + "?token="
-                                        + expected_token + "&reset=true&dataset=true", _,
+                                        + expected_token + "&dataset=true", _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
                                                 SetArgPointee<2>(nullptr),
diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd
index 3146e51f7..eea83b8c8 100644
--- a/worker/api/python/asapo_worker.pxd
+++ b/worker/api/python/asapo_worker.pxd
@@ -43,7 +43,8 @@ cdef extern from "asapo_worker.h" namespace "asapo" nogil:
         Error GetLast(FileInfo* info, string group_id, FileData* data)
         Error GetById(uint64_t id, FileInfo* info, string group_id, FileData* data)
         uint64_t GetNDataSets(Error* err)
-        Error ResetCounter(string group_id)
+        Error SetLastReadMarker(uint64_t value, string group_id)
+        Error ResetLastReadMarker(string group_id)
         string GenerateNewGroupId(Error* err)
         string GetBeamtimeMeta(Error* err)
         FileInfos QueryImages(string query, Error* err)
diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in
index cd19e16d5..8ea255691 100644
--- a/worker/api/python/asapo_worker.pyx.in
+++ b/worker/api/python/asapo_worker.pyx.in
@@ -93,11 +93,22 @@ cdef class PyDataBroker:
             return None,err_str
         else:
             return size,None
-    def reset_counter(self,group_id):
+    def set_lastread_marker(self,value,group_id):
         cdef string b_group_id = _bytes(group_id)
         cdef Error err
+        cdef uint64_t id = value
         with nogil:
-            err =  self.c_broker.ResetCounter(b_group_id)
+            err =  self.c_broker.SetLastReadMarker(id,b_group_id)
+        err_str = _str(GetErrorString(&err))
+        if err_str.strip():
+            return err_str
+        else:
+            return None
+    def reset_lastread_marker(self,group_id):
+        cdef string b_group_id = _bytes(group_id)
+        cdef Error err
+        with nogil:
+            err =  self.c_broker.ResetLastReadMarker(b_group_id)
         err_str = _str(GetErrorString(&err))
         if err_str.strip():
             return err_str
-- 
GitLab