diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 951f77ee6009143ea44ccac52c83780ba568802c..b1129ebcffc3b33b31ebaea06e4a457313087e7f 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -317,6 +317,18 @@ func (db *Mongodb) GetSize(db_name string) ([]byte, error) { return json.Marshal(&rec) } +func (db *Mongodb) ResetCounter(db_name string, group_id string) ([]byte, error) { + + if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { + return nil, err + } + + err := db.setCounter(db_name, group_id, 0) + + return []byte(""), err +} + + func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, id int) (answer []byte, err error) { switch op { case "next": @@ -327,6 +339,8 @@ func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, id return db.GetRecordByID(db_name, group_id, id, true, true) case "last": return db.GetLastRecord(db_name, group_id) + case "resetcounter": + return db.ResetCounter(db_name, group_id) case "size": return db.GetSize(db_name) } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 19a5db2bf80a05f516ff4fe45d17bbbef341811b..a5500e9ddc5444558eab2c30944de1d94b4a45d9 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -285,3 +285,25 @@ func TestMongoDBGetRecordByIDNotConnected(t *testing.T) { _, err := db.GetRecordByID(dbname, "", 2, true, false) assert.Equal(t, utils.StatusError, err.(*DBError).Code) } + +func TestMongoDBResetCounter(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.InsertRecord(dbname, &rec1) + db.InsertRecord(dbname, &rec2) + + res1, err1 := db.ProcessRequest(dbname, groupId, "next", 0) + + assert.Nil(t, err1) + assert.Equal(t, string(rec1_expect), string(res1)) + + _,err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", 0) + assert.Nil(t, err_reset) + + res2, err2 := db.ProcessRequest(dbname, groupId, "next", 0) + + + assert.Nil(t, err2) + assert.Equal(t, string(rec1_expect), string(res2)) + +} diff --git a/tests/automatic/common/cpp/include/testing.h b/tests/automatic/common/cpp/include/testing.h index 55c6ffe32aaf44528646877975202e8834ce9158..da20949e438c69999ec9d97a0fe9b92c1966a56d 100644 --- a/tests/automatic/common/cpp/include/testing.h +++ b/tests/automatic/common/cpp/include/testing.h @@ -7,7 +7,7 @@ namespace asapo { void M_AssertEq(const std::string& expected, const std::string& got); void M_AssertEq(int expected, int got); -void M_AssertTrue(bool value); +void M_AssertTrue(bool value,std::string name = ""); void M_AssertContains(const std::string& whole, const std::string& sub); diff --git a/tests/automatic/common/cpp/src/testing.cpp b/tests/automatic/common/cpp/src/testing.cpp index d3b9f97c6c2be8d70aada626c92fda5457d75c71..ab47a558451b4f64aa83f41c8cd32617b2ef0b6c 100644 --- a/tests/automatic/common/cpp/src/testing.cpp +++ b/tests/automatic/common/cpp/src/testing.cpp @@ -15,9 +15,9 @@ void T_AssertEq(const T& expected, const T& got) { } } -void M_AssertTrue(bool value) { +void M_AssertTrue(bool value, std::string name) { if (!value) { - std::cerr << "Assert failed:\n" + std::cerr << "Assert failed: "<<name<<"\n" << "Expected:\t'" << "1" << "'\n" << "Obtained:\t'" << value << "'\n"; exit(EXIT_FAILURE); diff --git a/tests/automatic/worker/CMakeLists.txt b/tests/automatic/worker/CMakeLists.txt index f8a2dd31597411f2a2297fc6436cce592cfbd8e9..d17eb49e8389fa4bef6d0ebaae6cbfd89e69ac1a 100644 --- a/tests/automatic/worker/CMakeLists.txt +++ b/tests/automatic/worker/CMakeLists.txt @@ -3,6 +3,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures add_subdirectory(next_multithread_folder) add_subdirectory(next_multithread_broker) add_subdirectory(connect_multithread) +add_subdirectory(worker_api) +add_subdirectory(worker_api_python) if(BUILD_WORKER_TOOLS) add_subdirectory(folder_to_db) diff --git a/tests/automatic/worker/worker_api/CMakeLists.txt b/tests/automatic/worker/worker_api/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ddef8469695638d686ba469c7317b31ebd92cff9 --- /dev/null +++ b/tests/automatic/worker/worker_api/CMakeLists.txt @@ -0,0 +1,17 @@ +set(TARGET_NAME worker_api) +set(SOURCE_FILES worker_api.cpp) + + +################################ +# Executable and link +################################ +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} test_common asapo-worker) + +################################ +# Testing +################################ +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>" + ) + diff --git a/tests/automatic/worker/worker_api/check_linux.sh b/tests/automatic/worker/worker_api/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..714e044d6834bbb26bc28fce99407cb4a2b54a1a --- /dev/null +++ b/tests/automatic/worker/worker_api/check_linux.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +database_name=test_run +token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= + +set -e + +trap Cleanup EXIT + +Cleanup() { + set +e + nomad stop nginx + nomad stop discovery + nomad stop broker + echo "db.dropDatabase()" | mongo ${database_name} +} + + +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 + +for i in `seq 1 10`; +do + echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0})' | mongo ${database_name} +done + +$@ 127.0.0.1:8400 $database_name $token_test_run + + diff --git a/tests/automatic/worker/worker_api/check_windows.bat b/tests/automatic/worker/worker_api/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..cfc4255c6ec1657323004d2d09adee6fdf2312a5 --- /dev/null +++ b/tests/automatic/worker/worker_api/check_windows.bat @@ -0,0 +1,28 @@ +SET database_name=test_run +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= + +::first argument path to the executable + +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 10 -w 100 > nul + +for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0}) | %mongo_exe% %database_name% || goto :error + + +%1 127.0.0.1:8400 %database_name% %token_test_run% || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx +echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp new file mode 100644 index 0000000000000000000000000000000000000000..66702c97348faf4593e78c696e50933a0f713421 --- /dev/null +++ b/tests/automatic/worker/worker_api/worker_api.cpp @@ -0,0 +1,82 @@ +#include <iostream> +#include <vector> +#include <thread> +#include <algorithm> +#include "worker/data_broker.h" +#include "testing.h" + +using asapo::M_AssertEq; +using asapo::M_AssertTrue; + + +struct Args { + std::string server; + std::string run_name; + std::string token; +}; + +Args GetArgs(int argc, char* argv[]) { + if (argc != 4) { + std::cout << "Wrong number of arguments" << std::endl; + exit(EXIT_FAILURE); + } + std::string server{argv[1]}; + std::string source_name{argv[2]}; + std::string token{argv[3]}; + + return Args{server, source_name, token}; +} + +void GetAllFromBroker(const Args& args) { + asapo::Error err; + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err); + broker->SetTimeout(100); + auto group_id = broker->GenerateNewGroupId(&err); + asapo::FileInfo fi; + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err==nullptr, "GetNext no error"); + M_AssertTrue(fi.name == "1", "GetNext filename"); + + err = broker->GetLast(&fi, group_id, nullptr); + M_AssertTrue(err==nullptr,"GetLast no error"); + M_AssertTrue(fi.name == "10","GetLast filename"); + + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err!=nullptr,"GetNext2 error"); + + err = broker->GetLast(&fi, group_id, nullptr); + M_AssertTrue(err==nullptr,"GetLast2 no error"); + + err = broker->GetById(8, &fi, group_id, nullptr); + M_AssertTrue(err==nullptr,"GetById error"); + M_AssertTrue(fi.name == "8","GetById filename"); + + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err==nullptr, "GetNext3 no error"); + M_AssertTrue(fi.name == "9", "GetNext3 filename"); + + auto size = broker->GetNDataSets(&err); + M_AssertTrue(err==nullptr, "GetNDataSets no error"); + M_AssertTrue(size == 10, "GetNDataSets size"); + + err = broker->ResetCounter(group_id); + M_AssertTrue(err==nullptr, "ResetCounter"); + + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err==nullptr, "GetNext4 no error"); + M_AssertTrue(fi.name == "1", "GetNext4 filename"); + + auto group_id2 = broker->GenerateNewGroupId(&err); + err = broker->GetNext(&fi, group_id2, nullptr); + M_AssertTrue(err==nullptr, "GetNext5 no error"); + M_AssertTrue(fi.name == "1", "GetNext5 filename"); + +} + +int main(int argc, char* argv[]) { + + auto args = GetArgs(argc, argv); + + GetAllFromBroker(args); + return 0; +} diff --git a/tests/automatic/worker/worker_api_python/CMakeLists.txt b/tests/automatic/worker/worker_api_python/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..7e6342c6f157ec21ef186c1d0a8b15873d978e0e --- /dev/null +++ b/tests/automatic/worker/worker_api_python/CMakeLists.txt @@ -0,0 +1,14 @@ +set(TARGET_NAME worker_api_python) + + +prepare_asapo() + +if (UNIX) + get_target_property(PYTHON_LIBS python-lib2 BINARY_DIR) +else() + get_target_property(PYTHON_LIBS asapo_worker BINARY_DIR) +endif() + +add_script_test("${TARGET_NAME}" ${PYTHON_LIBS} nomem) +configure_file(worker_api.py worker_api.py COPYONLY) + diff --git a/tests/automatic/worker/worker_api_python/check_linux.sh b/tests/automatic/worker/worker_api_python/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..5cc70052861184168f04b8a0a33fd64668431185 --- /dev/null +++ b/tests/automatic/worker/worker_api_python/check_linux.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +source_path=dummy +database_name=test_run +token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= +set -e + + + +trap Cleanup EXIT + +Cleanup() { + set +e + nomad stop nginx + nomad stop discovery + nomad stop broker + echo "db.dropDatabase()" | mongo ${database_name} +} + +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd + +for i in `seq 1 5`; +do + echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0})' | mongo ${database_name} +done + +sleep 1 + +export PYTHONPATH=$1:${PYTHONPATH} + +python worker_api.py 127.0.0.1:8400 $source_path $database_name $token_test_run + + + + + diff --git a/tests/automatic/worker/worker_api_python/check_windows.bat b/tests/automatic/worker/worker_api_python/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..66594fcb605e48ccbfb9d0f09bbb8ffad4970330 --- /dev/null +++ b/tests/automatic/worker/worker_api_python/check_windows.bat @@ -0,0 +1,29 @@ +SET source_path=dummy +SET database_name=test_run + +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= + +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 10 -w 100 > nul + +for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0}) | %mongo_exe% %database_name% || goto :error + +set PYTHONPATH=%1 + +python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx +echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/tests/automatic/worker/worker_api_python/worker_api.py b/tests/automatic/worker/worker_api_python/worker_api.py new file mode 100644 index 0000000000000000000000000000000000000000..9e94106e681ac69a3d29ae22b4a9bdd6dc7c08ef --- /dev/null +++ b/tests/automatic/worker/worker_api_python/worker_api.py @@ -0,0 +1,72 @@ +from __future__ import print_function + +import asapo_worker +import json +import sys + +def assert_err(err,name): + if err != None: + print (name + ' err: ', err) + sys.exit(1) + +def assert_noterr(err,name): + if err == None: + print (name + ' err: ', err) + sys.exit(1) + + +def assert_metaname(meta,compare,name): + if meta['name'] != compare: + print ("error at "+name) + print ('meta: ', json.dumps(meta, indent=4, sort_keys=True)) + sys.exit(1) + +def assert_eq(val,expected,name): + if val != expected: + print ("error at "+name) + print ('val: ', val,' expected: ',expected) + sys.exit(1) + + +source, path, beamtime, token = sys.argv[1:] + +broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000) + +group_id_new, err = broker.generate_group_id() +assert_err(err,"generate_group") + +_, meta, err = broker.get_next(group_id_new, meta_only=True) +assert_err(err,"get_next") +assert_metaname(meta,"1","get next1") + +_, meta, err = broker.get_next(group_id_new, meta_only=True) +assert_err(err,"get_next2") +assert_metaname(meta,"2","get next2") + +_, meta, err = broker.get_last(group_id_new, meta_only=True) +assert_err(err,"get_last1") +assert_metaname(meta,"5","get last1") + +_, meta, err = broker.get_next(group_id_new, meta_only=True) +assert_noterr(err,"get_next3") + +size,err = broker.get_ndatasets() +assert_err(err,"get_ndatasets") +assert_eq(size,5,"get_ndatasets") + + +err = broker.reset_counter(group_id_new) +assert_err(err,"reset_counter") + +_, meta, err = broker.get_next(group_id_new, meta_only=True) +assert_err(err,"get_next4") +assert_metaname(meta,"1","get next4") + + +_, meta, err = broker.get_by_id(3, group_id_new, meta_only=True) +assert_err(err,"get_by_id") +assert_metaname(meta,"3","get get_by_id") + +_, meta, err = broker.get_next(group_id_new, meta_only=True) +assert_err(err,"get_next5") +assert_metaname(meta,"4","get next5") diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 042f854785b68780a7f11b4047526d68d4ce5f78..ea4123b244d17dfc2250e4ae3dd0dd9a4985f2d6 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -74,7 +74,7 @@ class DataBroker { \param data - where to store image data. Can be set to nullptr only image metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetById(uint64_t id, FileInfo* info, FileData* data) = 0; + virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) = 0; //! Receive last available image. diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index 629d036896b78af63ce1de3ddd56de8af59657d2..845c0d4cdb2e7121de26f3b804edc7f8556a8de7 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -89,7 +89,7 @@ uint64_t FolderDataBroker::GetNDataSets(Error* err) { return filelist_.size(); } -Error FolderDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) { +Error FolderDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) { return GetFileByIndex(id - 1 , info, data); } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index 8c06cde2b1324ffb59c62156bba76a9efb5f0476..4c4ec17705ca5c4eb6f4741b146c875ecd462d28 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -21,7 +21,7 @@ class FolderDataBroker final : public asapo::DataBroker { std::string GenerateNewGroupId(Error* err) override; // return "0" always and no error - no group ids for folder datra broker uint64_t GetNDataSets(Error* err) override; - Error GetById(uint64_t id, FileInfo* info, FileData* data) override; + Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch private: std::string base_path_; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 6435c693175f8c4d93a07a621b27e0bdfbca5b70..836b39af7e00f2c6e3c0e813d94b8dae84286544 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -252,8 +252,8 @@ uint64_t ServerDataBroker::GetNDataSets(Error* err) { return size; } -Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) { - std::string request_string = "database/" + source_name_ + "/" + std::to_string(id); +Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) { + std::string request_string = "database/" + source_name_ + "/" + std::move(group_id)+ "/" + std::to_string(id); std::string extra_params = "&reset=true"; Error err; auto responce = BrokerRequestWithTimeout(request_string, extra_params, false, &err); diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 9a3eeb9bdcf0af9f106a61676bd22a6b51011a00..a44035d5899cfba98a1c727bbbc51da602e34376 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -24,7 +24,7 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; std::string GenerateNewGroupId(Error* err) override; uint64_t GetNDataSets(Error* err) override; - Error GetById(uint64_t id, FileInfo* info, FileData* data) override; + Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index dc415e3ccf0e4f1e1da96b6a94e2c0685a0bc7bb..c7eb31500960fdcf98b3cb4586a4e9ea323182e5 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -307,7 +307,7 @@ TEST_F(FolderDataBrokerTests, GetByIdReturnsFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetById(1, &fi, nullptr); + auto err = data_broker->GetById(1, &fi,"",nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("1")); @@ -319,8 +319,8 @@ TEST_F(FolderDataBrokerTests, GetByIdReturnsError) { data_broker->Connect(); FileInfo fi; - auto err1 = data_broker->GetById(0, &fi, nullptr); - auto err2 = data_broker->GetById(10, &fi, nullptr); + auto err1 = data_broker->GetById(0, &fi,"", nullptr); + auto err2 = data_broker->GetById(10, &fi,"", nullptr); ASSERT_THAT(err1, Ne(nullptr)); ASSERT_THAT(err2, Ne(nullptr)); diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 68ad645f4dfc7f3cb1001f9999bf4edbc3a74a82..63034e5ce75568219b5c0c5a7268ec12164773ec 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -454,7 +454,8 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { auto to_send = CreateFI(); auto json = to_send.Json(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string( + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + + "/" + std::to_string( expected_dataset_id) + "?token=" + expected_token+"&reset=true", _, _)).WillOnce(DoAll( @@ -462,7 +463,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(json))); - auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); + auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(info.name, Eq(to_send.name)); @@ -475,7 +476,8 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsNoData) { auto to_send = CreateFI(); auto json = to_send.Json(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string( + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + + "/" + std::to_string( expected_dataset_id) + "?token=" + expected_token+"&reset=true", _, _)).WillOnce(DoAll( @@ -483,7 +485,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsNoData) { SetArgPointee<2>(nullptr), Return("{\"id\":1}"))); - auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); + auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); ASSERT_THAT(err->GetErrorType(), Eq(asapo::ErrorType::kEndOfFile)); diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd index 025c84eeee84604225cb8d4abc751cfb902f182d..05e8d826b874fc2c0e3cf64da38211e673840dc2 100644 --- a/worker/api/python/asapo_worker.pxd +++ b/worker/api/python/asapo_worker.pxd @@ -30,6 +30,9 @@ cdef extern from "asapo_worker.h" namespace "asapo": void SetTimeout(uint64_t timeout_ms) Error GetNext(FileInfo* info, string group_id, FileData* data) Error GetLast(FileInfo* info, string group_id, FileData* data) + Error GetById(uint64_t id, FileInfo* info, string group_id, FileData* data) + uint64_t GetNDataSets(Error* err) + Error ResetCounter(string group_id) string GenerateNewGroupId(Error* err) cdef extern from "asapo_worker.h" namespace "asapo": diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index 2bb14cdedb4d813d3de1d0fab71f714dbf62fa2b..38bc41ec9a491e314605f02881287690cb3b9dfa 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -26,7 +26,7 @@ cdef bytes _bytes(s): cdef class PyDataBroker: cdef DataBroker* c_broker - def _op(self, op, group_id, meta_only): + def _op(self, op, group_id, meta_only,id): cdef FileInfo info cdef FileData data cdef Error err @@ -35,6 +35,8 @@ cdef class PyDataBroker: err = self.c_broker.GetNext(&info, _bytes(group_id), <FileData*>NULL if meta_only else &data) elif op == "last": err = self.c_broker.GetLast(&info,_bytes(group_id), <FileData*>NULL if meta_only else &data) + elif op == "id": + err = self.c_broker.GetById(id, &info,_bytes(group_id), <FileData*>NULL if meta_only else &data) err_str = _str(GetErrorString(&err)) if err_str.strip(): return None,None,err_str @@ -50,9 +52,27 @@ cdef class PyDataBroker: arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) return arr,meta,None def get_next(self, group_id, meta_only = True): - return self._op("next",group_id,meta_only) + return self._op("next",group_id,meta_only,0) def get_last(self, group_id, meta_only = True): - return self._op("last",group_id,meta_only) + return self._op("last",group_id,meta_only,0) + def get_by_id(self,id,group_id,meta_only = True): + return self._op("id",group_id,meta_only,id) + def get_ndatasets(self): + cdef Error err + size = self.c_broker.GetNDataSets(&err) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return None,err_str + else: + return size,None + def reset_counter(self,group_id): + cdef Error err + err = self.c_broker.ResetCounter(_bytes(group_id)) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return err_str + else: + return None def generate_group_id(self): cdef Error err cdef string group_id