From 84ade43e3a97ad400bdbfd1b99bd4c7e425012e7 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Thu, 4 Apr 2019 17:16:11 +0200
Subject: [PATCH] added more functions and tests

---
 broker/src/asapo_broker/database/mongodb.go   | 14 ++++
 .../src/asapo_broker/database/mongodb_test.go | 22 +++++
 tests/automatic/common/cpp/include/testing.h  |  2 +-
 tests/automatic/common/cpp/src/testing.cpp    |  4 +-
 tests/automatic/worker/CMakeLists.txt         |  2 +
 .../worker/worker_api/CMakeLists.txt          | 17 ++++
 .../worker/worker_api/check_linux.sh          | 32 ++++++++
 .../worker/worker_api/check_windows.bat       | 28 +++++++
 .../worker/worker_api/worker_api.cpp          | 82 +++++++++++++++++++
 .../worker/worker_api_python/CMakeLists.txt   | 14 ++++
 .../worker/worker_api_python/check_linux.sh   | 38 +++++++++
 .../worker_api_python/check_windows.bat       | 29 +++++++
 .../worker/worker_api_python/worker_api.py    | 72 ++++++++++++++++
 worker/api/cpp/include/worker/data_broker.h   |  2 +-
 worker/api/cpp/src/folder_data_broker.cpp     |  2 +-
 worker/api/cpp/src/folder_data_broker.h       |  2 +-
 worker/api/cpp/src/server_data_broker.cpp     |  4 +-
 worker/api/cpp/src/server_data_broker.h       |  2 +-
 .../api/cpp/unittests/test_folder_broker.cpp  |  6 +-
 .../api/cpp/unittests/test_server_broker.cpp  | 10 ++-
 worker/api/python/asapo_worker.pxd            |  3 +
 worker/api/python/asapo_worker.pyx.in         | 26 +++++-
 22 files changed, 394 insertions(+), 19 deletions(-)
 create mode 100644 tests/automatic/worker/worker_api/CMakeLists.txt
 create mode 100644 tests/automatic/worker/worker_api/check_linux.sh
 create mode 100644 tests/automatic/worker/worker_api/check_windows.bat
 create mode 100644 tests/automatic/worker/worker_api/worker_api.cpp
 create mode 100644 tests/automatic/worker/worker_api_python/CMakeLists.txt
 create mode 100644 tests/automatic/worker/worker_api_python/check_linux.sh
 create mode 100644 tests/automatic/worker/worker_api_python/check_windows.bat
 create mode 100644 tests/automatic/worker/worker_api_python/worker_api.py

diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 951f77ee6..b1129ebcf 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -317,6 +317,18 @@ func (db *Mongodb) GetSize(db_name string) ([]byte, error) {
 	return json.Marshal(&rec)
 }
 
+func (db *Mongodb) ResetCounter(db_name string, group_id string) ([]byte, error) {
+
+	if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil {
+		return nil, err
+	}
+
+	err := db.setCounter(db_name, group_id, 0)
+
+	return []byte(""), err
+}
+
+
 func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, id int) (answer []byte, err error) {
 	switch op {
 	case "next":
@@ -327,6 +339,8 @@ func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, id
 		return db.GetRecordByID(db_name, group_id, id, true, true)
 	case "last":
 		return db.GetLastRecord(db_name, group_id)
+	case "resetcounter":
+		return db.ResetCounter(db_name, group_id)
 	case "size":
 		return db.GetSize(db_name)
 	}
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index 19a5db2bf..a5500e9dd 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -285,3 +285,25 @@ func TestMongoDBGetRecordByIDNotConnected(t *testing.T) {
 	_, err := db.GetRecordByID(dbname, "", 2, true, false)
 	assert.Equal(t, utils.StatusError, err.(*DBError).Code)
 }
+
+func TestMongoDBResetCounter(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	db.InsertRecord(dbname, &rec1)
+	db.InsertRecord(dbname, &rec2)
+
+	res1, err1 := db.ProcessRequest(dbname, groupId, "next", 0)
+
+	assert.Nil(t, err1)
+	assert.Equal(t, string(rec1_expect), string(res1))
+
+	_,err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", 0)
+	assert.Nil(t, err_reset)
+
+	res2, err2 := db.ProcessRequest(dbname, groupId, "next", 0)
+
+
+	assert.Nil(t, err2)
+	assert.Equal(t, string(rec1_expect), string(res2))
+
+}
diff --git a/tests/automatic/common/cpp/include/testing.h b/tests/automatic/common/cpp/include/testing.h
index 55c6ffe32..da20949e4 100644
--- a/tests/automatic/common/cpp/include/testing.h
+++ b/tests/automatic/common/cpp/include/testing.h
@@ -7,7 +7,7 @@ namespace asapo {
 
 void M_AssertEq(const std::string& expected, const std::string& got);
 void M_AssertEq(int expected, int got);
-void M_AssertTrue(bool value);
+void M_AssertTrue(bool value,std::string name = "");
 void M_AssertContains(const std::string& whole, const std::string& sub);
 
 
diff --git a/tests/automatic/common/cpp/src/testing.cpp b/tests/automatic/common/cpp/src/testing.cpp
index d3b9f97c6..ab47a5584 100644
--- a/tests/automatic/common/cpp/src/testing.cpp
+++ b/tests/automatic/common/cpp/src/testing.cpp
@@ -15,9 +15,9 @@ void T_AssertEq(const T& expected, const T& got) {
     }
 }
 
-void M_AssertTrue(bool value) {
+void M_AssertTrue(bool value, std::string name) {
     if (!value) {
-        std::cerr << "Assert failed:\n"
+        std::cerr << "Assert failed: "<<name<<"\n"
                   << "Expected:\t'" << "1" << "'\n"
                   << "Obtained:\t'" << value << "'\n";
         exit(EXIT_FAILURE);
diff --git a/tests/automatic/worker/CMakeLists.txt b/tests/automatic/worker/CMakeLists.txt
index f8a2dd315..d17eb49e8 100644
--- a/tests/automatic/worker/CMakeLists.txt
+++ b/tests/automatic/worker/CMakeLists.txt
@@ -3,6 +3,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures
 add_subdirectory(next_multithread_folder)
 add_subdirectory(next_multithread_broker)
 add_subdirectory(connect_multithread)
+add_subdirectory(worker_api)
+add_subdirectory(worker_api_python)
 
 if(BUILD_WORKER_TOOLS)
     add_subdirectory(folder_to_db)
diff --git a/tests/automatic/worker/worker_api/CMakeLists.txt b/tests/automatic/worker/worker_api/CMakeLists.txt
new file mode 100644
index 000000000..ddef84696
--- /dev/null
+++ b/tests/automatic/worker/worker_api/CMakeLists.txt
@@ -0,0 +1,17 @@
+set(TARGET_NAME worker_api)
+set(SOURCE_FILES worker_api.cpp)
+
+
+################################
+# Executable and link
+################################
+add_executable(${TARGET_NAME} ${SOURCE_FILES})
+target_link_libraries(${TARGET_NAME} test_common asapo-worker)
+
+################################
+# Testing
+################################
+prepare_asapo()
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>"
+        )
+
diff --git a/tests/automatic/worker/worker_api/check_linux.sh b/tests/automatic/worker/worker_api/check_linux.sh
new file mode 100644
index 000000000..714e044d6
--- /dev/null
+++ b/tests/automatic/worker/worker_api/check_linux.sh
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+database_name=test_run
+token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
+
+set -e
+
+trap Cleanup EXIT
+
+Cleanup() {
+    set +e
+    nomad stop nginx
+    nomad stop discovery
+    nomad stop broker
+	echo "db.dropDatabase()" | mongo ${database_name}
+}
+
+
+nomad run nginx.nmd
+nomad run discovery.nmd
+nomad run broker.nmd
+
+sleep 1
+
+for i in `seq 1 10`;
+do
+	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0})' | mongo ${database_name}
+done
+
+$@ 127.0.0.1:8400 $database_name $token_test_run
+
+
diff --git a/tests/automatic/worker/worker_api/check_windows.bat b/tests/automatic/worker/worker_api/check_windows.bat
new file mode 100644
index 000000000..cfc4255c6
--- /dev/null
+++ b/tests/automatic/worker/worker_api/check_windows.bat
@@ -0,0 +1,28 @@
+SET database_name=test_run
+SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
+set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
+
+::first argument  path to the executable
+
+c:\opt\consul\nomad run discovery.nmd
+c:\opt\consul\nomad run broker.nmd
+c:\opt\consul\nomad run nginx.nmd
+
+ping 1.0.0.0 -n 10 -w 100 > nul
+
+for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0}) | %mongo_exe% %database_name%  || goto :error
+
+
+%1 127.0.0.1:8400 %database_name% %token_test_run% || goto :error
+
+goto :clean
+
+:error
+call :clean
+exit /b 1
+
+:clean
+c:\opt\consul\nomad stop discovery
+c:\opt\consul\nomad stop broker
+c:\opt\consul\nomad stop nginx
+echo db.dropDatabase() | %mongo_exe% %database_name%
diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp
new file mode 100644
index 000000000..66702c973
--- /dev/null
+++ b/tests/automatic/worker/worker_api/worker_api.cpp
@@ -0,0 +1,82 @@
+#include <iostream>
+#include <vector>
+#include <thread>
+#include <algorithm>
+#include "worker/data_broker.h"
+#include "testing.h"
+
+using asapo::M_AssertEq;
+using asapo::M_AssertTrue;
+
+
+struct Args {
+    std::string server;
+    std::string run_name;
+    std::string token;
+};
+
+Args GetArgs(int argc, char* argv[]) {
+    if (argc != 4) {
+        std::cout << "Wrong number of arguments" << std::endl;
+        exit(EXIT_FAILURE);
+    }
+    std::string server{argv[1]};
+    std::string source_name{argv[2]};
+    std::string token{argv[3]};
+
+    return Args{server, source_name, token};
+}
+
+void GetAllFromBroker(const Args& args) {
+    asapo::Error err;
+    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err);
+    broker->SetTimeout(100);
+    auto group_id = broker->GenerateNewGroupId(&err);
+    asapo::FileInfo fi;
+    err = broker->GetNext(&fi, group_id, nullptr);
+    M_AssertTrue(err==nullptr, "GetNext no error");
+    M_AssertTrue(fi.name == "1", "GetNext filename");
+
+    err = broker->GetLast(&fi, group_id, nullptr);
+    M_AssertTrue(err==nullptr,"GetLast no error");
+    M_AssertTrue(fi.name == "10","GetLast filename");
+
+    err = broker->GetNext(&fi, group_id, nullptr);
+    M_AssertTrue(err!=nullptr,"GetNext2 error");
+
+    err = broker->GetLast(&fi, group_id, nullptr);
+    M_AssertTrue(err==nullptr,"GetLast2 no error");
+
+    err = broker->GetById(8, &fi, group_id, nullptr);
+    M_AssertTrue(err==nullptr,"GetById error");
+    M_AssertTrue(fi.name == "8","GetById filename");
+
+    err = broker->GetNext(&fi, group_id, nullptr);
+    M_AssertTrue(err==nullptr, "GetNext3 no error");
+    M_AssertTrue(fi.name == "9", "GetNext3 filename");
+
+    auto size = broker->GetNDataSets(&err);
+    M_AssertTrue(err==nullptr, "GetNDataSets no error");
+    M_AssertTrue(size == 10, "GetNDataSets size");
+
+    err = broker->ResetCounter(group_id);
+    M_AssertTrue(err==nullptr, "ResetCounter");
+
+    err = broker->GetNext(&fi, group_id, nullptr);
+    M_AssertTrue(err==nullptr, "GetNext4 no error");
+    M_AssertTrue(fi.name == "1", "GetNext4 filename");
+
+    auto group_id2 = broker->GenerateNewGroupId(&err);
+    err = broker->GetNext(&fi, group_id2, nullptr);
+    M_AssertTrue(err==nullptr, "GetNext5 no error");
+    M_AssertTrue(fi.name == "1", "GetNext5  filename");
+
+}
+
+int main(int argc, char* argv[]) {
+
+    auto args = GetArgs(argc, argv);
+
+    GetAllFromBroker(args);
+    return 0;
+}
diff --git a/tests/automatic/worker/worker_api_python/CMakeLists.txt b/tests/automatic/worker/worker_api_python/CMakeLists.txt
new file mode 100644
index 000000000..7e6342c6f
--- /dev/null
+++ b/tests/automatic/worker/worker_api_python/CMakeLists.txt
@@ -0,0 +1,14 @@
+set(TARGET_NAME worker_api_python)
+
+
+prepare_asapo()
+
+if (UNIX)
+    get_target_property(PYTHON_LIBS python-lib2 BINARY_DIR)
+else()
+    get_target_property(PYTHON_LIBS asapo_worker BINARY_DIR)
+endif()
+
+add_script_test("${TARGET_NAME}" ${PYTHON_LIBS} nomem)
+configure_file(worker_api.py worker_api.py COPYONLY)
+
diff --git a/tests/automatic/worker/worker_api_python/check_linux.sh b/tests/automatic/worker/worker_api_python/check_linux.sh
new file mode 100644
index 000000000..5cc700528
--- /dev/null
+++ b/tests/automatic/worker/worker_api_python/check_linux.sh
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+
+source_path=dummy
+database_name=test_run
+token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
+set -e
+
+
+
+trap Cleanup EXIT
+
+Cleanup() {
+    set +e
+    nomad stop nginx
+    nomad stop discovery
+    nomad stop broker
+	echo "db.dropDatabase()" | mongo ${database_name}
+}
+
+nomad run nginx.nmd
+nomad run discovery.nmd
+nomad run broker.nmd
+
+for i in `seq 1 5`;
+do
+	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0})' | mongo ${database_name}
+done
+
+sleep 1
+
+export PYTHONPATH=$1:${PYTHONPATH}
+
+python worker_api.py 127.0.0.1:8400 $source_path $database_name $token_test_run
+
+
+
+
+
diff --git a/tests/automatic/worker/worker_api_python/check_windows.bat b/tests/automatic/worker/worker_api_python/check_windows.bat
new file mode 100644
index 000000000..66594fcb6
--- /dev/null
+++ b/tests/automatic/worker/worker_api_python/check_windows.bat
@@ -0,0 +1,29 @@
+SET source_path=dummy
+SET database_name=test_run
+
+SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
+set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
+
+c:\opt\consul\nomad run discovery.nmd
+c:\opt\consul\nomad run broker.nmd
+c:\opt\consul\nomad run nginx.nmd
+
+ping 1.0.0.0 -n 10 -w 100 > nul
+
+for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0}) | %mongo_exe% %database_name%  || goto :error
+
+set PYTHONPATH=%1
+
+python worker_api.py 127.0.0.1:8400 %source_path% %database_name%  %token_test_run% || goto :error
+
+goto :clean
+
+:error
+call :clean
+exit /b 1
+
+:clean
+c:\opt\consul\nomad stop discovery
+c:\opt\consul\nomad stop broker
+c:\opt\consul\nomad stop nginx
+echo db.dropDatabase() | %mongo_exe% %database_name%
diff --git a/tests/automatic/worker/worker_api_python/worker_api.py b/tests/automatic/worker/worker_api_python/worker_api.py
new file mode 100644
index 000000000..9e94106e6
--- /dev/null
+++ b/tests/automatic/worker/worker_api_python/worker_api.py
@@ -0,0 +1,72 @@
+from __future__ import print_function
+
+import asapo_worker
+import json
+import sys
+
+def assert_err(err,name):
+    if err != None:
+        print (name + ' err: ', err)
+        sys.exit(1)
+
+def assert_noterr(err,name):
+    if err == None:
+        print (name + ' err: ', err)
+        sys.exit(1)
+
+
+def assert_metaname(meta,compare,name):
+    if meta['name'] != compare:
+        print ("error at "+name)
+        print ('meta: ', json.dumps(meta, indent=4, sort_keys=True))
+        sys.exit(1)
+
+def assert_eq(val,expected,name):
+    if val != expected:
+        print ("error at "+name)
+        print ('val: ', val,' expected: ',expected)
+        sys.exit(1)
+
+
+source, path, beamtime, token = sys.argv[1:]
+
+broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000)
+
+group_id_new, err = broker.generate_group_id()
+assert_err(err,"generate_group")
+
+_, meta, err = broker.get_next(group_id_new, meta_only=True)
+assert_err(err,"get_next")
+assert_metaname(meta,"1","get next1")
+
+_, meta, err = broker.get_next(group_id_new, meta_only=True)
+assert_err(err,"get_next2")
+assert_metaname(meta,"2","get next2")
+
+_, meta, err = broker.get_last(group_id_new, meta_only=True)
+assert_err(err,"get_last1")
+assert_metaname(meta,"5","get last1")
+
+_, meta, err = broker.get_next(group_id_new, meta_only=True)
+assert_noterr(err,"get_next3")
+
+size,err = broker.get_ndatasets()
+assert_err(err,"get_ndatasets")
+assert_eq(size,5,"get_ndatasets")
+
+
+err = broker.reset_counter(group_id_new)
+assert_err(err,"reset_counter")
+
+_, meta, err = broker.get_next(group_id_new, meta_only=True)
+assert_err(err,"get_next4")
+assert_metaname(meta,"1","get next4")
+
+
+_, meta, err = broker.get_by_id(3, group_id_new, meta_only=True)
+assert_err(err,"get_by_id")
+assert_metaname(meta,"3","get get_by_id")
+
+_, meta, err = broker.get_next(group_id_new, meta_only=True)
+assert_err(err,"get_next5")
+assert_metaname(meta,"4","get next5")
diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h
index 042f85478..ea4123b24 100644
--- a/worker/api/cpp/include/worker/data_broker.h
+++ b/worker/api/cpp/include/worker/data_broker.h
@@ -74,7 +74,7 @@ class DataBroker {
       \param data - where to store image data. Can be set to nullptr only image metadata is needed.
       \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
     */
-    virtual Error GetById(uint64_t id, FileInfo* info, FileData* data) = 0;
+    virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) = 0;
 
 
     //! Receive last available image.
diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp
index 629d03689..845c0d4cd 100644
--- a/worker/api/cpp/src/folder_data_broker.cpp
+++ b/worker/api/cpp/src/folder_data_broker.cpp
@@ -89,7 +89,7 @@ uint64_t FolderDataBroker::GetNDataSets(Error* err) {
     return filelist_.size();
 }
 
-Error FolderDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) {
+Error FolderDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) {
     return GetFileByIndex(id - 1 , info, data);
 }
 
diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h
index 8c06cde2b..4c4ec1770 100644
--- a/worker/api/cpp/src/folder_data_broker.h
+++ b/worker/api/cpp/src/folder_data_broker.h
@@ -21,7 +21,7 @@ class FolderDataBroker final : public asapo::DataBroker {
     std::string GenerateNewGroupId(Error* err)
     override; // return "0" always and no error - no group ids for folder datra broker
     uint64_t GetNDataSets(Error* err) override;
-    Error GetById(uint64_t id, FileInfo* info, FileData* data) override;
+    Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override;
     std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch
   private:
     std::string base_path_;
diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp
index 6435c6931..836b39af7 100644
--- a/worker/api/cpp/src/server_data_broker.cpp
+++ b/worker/api/cpp/src/server_data_broker.cpp
@@ -252,8 +252,8 @@ uint64_t ServerDataBroker::GetNDataSets(Error* err) {
     return size;
 
 }
-Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) {
-    std::string request_string =  "database/" + source_name_ + "/" + std::to_string(id);
+Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) {
+    std::string request_string =  "database/" + source_name_ + "/" + std::move(group_id)+ "/" + std::to_string(id);
     std::string extra_params =  "&reset=true";
     Error err;
     auto responce = BrokerRequestWithTimeout(request_string, extra_params, false, &err);
diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h
index 9a3eeb9bd..a44035d58 100644
--- a/worker/api/cpp/src/server_data_broker.h
+++ b/worker/api/cpp/src/server_data_broker.h
@@ -24,7 +24,7 @@ class ServerDataBroker final : public asapo::DataBroker {
     Error GetLast(FileInfo* info, std::string group_id, FileData* data) override;
     std::string GenerateNewGroupId(Error* err) override;
     uint64_t GetNDataSets(Error* err) override;
-    Error GetById(uint64_t id, FileInfo* info, FileData* data) override;
+    Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override;
     void SetTimeout(uint64_t timeout_ms) override;
     std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch
     std::unique_ptr<HttpClient> httpclient__;
diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp
index dc415e3cc..c7eb31500 100644
--- a/worker/api/cpp/unittests/test_folder_broker.cpp
+++ b/worker/api/cpp/unittests/test_folder_broker.cpp
@@ -307,7 +307,7 @@ TEST_F(FolderDataBrokerTests, GetByIdReturnsFileInfo) {
     data_broker->Connect();
     FileInfo fi;
 
-    auto err = data_broker->GetById(1, &fi, nullptr);
+    auto err = data_broker->GetById(1, &fi,"",nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(fi.name, Eq("1"));
@@ -319,8 +319,8 @@ TEST_F(FolderDataBrokerTests, GetByIdReturnsError) {
     data_broker->Connect();
     FileInfo fi;
 
-    auto err1 = data_broker->GetById(0, &fi, nullptr);
-    auto err2 = data_broker->GetById(10, &fi, nullptr);
+    auto err1 = data_broker->GetById(0, &fi,"", nullptr);
+    auto err2 = data_broker->GetById(10, &fi,"", nullptr);
 
     ASSERT_THAT(err1, Ne(nullptr));
     ASSERT_THAT(err2, Ne(nullptr));
diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp
index 68ad645f4..63034e5ce 100644
--- a/worker/api/cpp/unittests/test_server_broker.cpp
+++ b/worker/api/cpp/unittests/test_server_broker.cpp
@@ -454,7 +454,8 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
     auto to_send = CreateFI();
     auto json = to_send.Json();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string(
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/"  + expected_group_id
+                                            + "/" + std::to_string(
                                             expected_dataset_id) + "?token="
                                         + expected_token+"&reset=true", _,
                                         _)).WillOnce(DoAll(
@@ -462,7 +463,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
                                                 SetArgPointee<2>(nullptr),
                                                 Return(json)));
 
-    auto err = data_broker->GetById(expected_dataset_id, &info, nullptr);
+    auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(info.name, Eq(to_send.name));
@@ -475,7 +476,8 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsNoData) {
     auto to_send = CreateFI();
     auto json = to_send.Json();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string(
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id
+                                            + "/" + std::to_string(
                                             expected_dataset_id) + "?token="
                                         + expected_token+"&reset=true", _,
                                         _)).WillOnce(DoAll(
@@ -483,7 +485,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsNoData) {
                                                 SetArgPointee<2>(nullptr),
                                                 Return("{\"id\":1}")));
 
-    auto err = data_broker->GetById(expected_dataset_id, &info, nullptr);
+    auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr);
 
     ASSERT_THAT(err->GetErrorType(), Eq(asapo::ErrorType::kEndOfFile));
 
diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd
index 025c84eee..05e8d826b 100644
--- a/worker/api/python/asapo_worker.pxd
+++ b/worker/api/python/asapo_worker.pxd
@@ -30,6 +30,9 @@ cdef extern from "asapo_worker.h" namespace "asapo":
         void SetTimeout(uint64_t timeout_ms)
         Error GetNext(FileInfo* info, string group_id, FileData* data)
         Error GetLast(FileInfo* info, string group_id, FileData* data)
+        Error GetById(uint64_t id, FileInfo* info, string group_id, FileData* data)
+        uint64_t GetNDataSets(Error* err)
+        Error ResetCounter(string group_id)
         string GenerateNewGroupId(Error* err)
 
 cdef extern from "asapo_worker.h" namespace "asapo":
diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in
index 2bb14cded..38bc41ec9 100644
--- a/worker/api/python/asapo_worker.pyx.in
+++ b/worker/api/python/asapo_worker.pyx.in
@@ -26,7 +26,7 @@ cdef bytes _bytes(s):
 
 cdef class PyDataBroker:
     cdef DataBroker* c_broker
-    def _op(self, op, group_id, meta_only):
+    def _op(self, op, group_id, meta_only,id):
         cdef FileInfo info
         cdef FileData data
         cdef Error err
@@ -35,6 +35,8 @@ cdef class PyDataBroker:
             err =  self.c_broker.GetNext(&info, _bytes(group_id), <FileData*>NULL if meta_only else &data)
         elif op == "last":
             err =  self.c_broker.GetLast(&info,_bytes(group_id), <FileData*>NULL if meta_only else &data)
+        elif op == "id":
+            err =  self.c_broker.GetById(id, &info,_bytes(group_id), <FileData*>NULL if meta_only else &data)
         err_str = _str(GetErrorString(&err))
         if err_str.strip():
             return None,None,err_str
@@ -50,9 +52,27 @@ cdef class PyDataBroker:
         arr =  np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr)
         return arr,meta,None
     def get_next(self, group_id, meta_only = True):
-        return self._op("next",group_id,meta_only)
+        return self._op("next",group_id,meta_only,0)
     def get_last(self, group_id, meta_only = True):
-        return self._op("last",group_id,meta_only)
+        return self._op("last",group_id,meta_only,0)
+    def get_by_id(self,id,group_id,meta_only = True):
+        return self._op("id",group_id,meta_only,id)
+    def get_ndatasets(self):
+        cdef Error err
+        size =  self.c_broker.GetNDataSets(&err)
+        err_str = _str(GetErrorString(&err))
+        if err_str.strip():
+            return None,err_str
+        else:
+            return size,None
+    def reset_counter(self,group_id):
+        cdef Error err
+        err =  self.c_broker.ResetCounter(_bytes(group_id))
+        err_str = _str(GetErrorString(&err))
+        if err_str.strip():
+            return err_str
+        else:
+            return None
     def generate_group_id(self):
         cdef Error err
         cdef string group_id
-- 
GitLab