From 108f3187c92e1b3979c6d47d3f0efcc62b05ad5b Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 30 Jul 2019 16:11:45 +0200 Subject: [PATCH] implemented retrieving data from given fileinfo --- .../worker/worker_api/check_linux.sh | 10 +++++-- .../worker/worker_api/check_windows.bat | 10 +++++-- .../worker/worker_api/worker_api.cpp | 14 +++++++++- .../worker/worker_api_python/check_linux.sh | 11 +++++--- .../worker_api_python/check_windows.bat | 11 +++++--- .../worker/worker_api_python/worker_api.py | 8 ++++++ worker/api/cpp/include/worker/data_broker.h | 8 +++++- worker/api/cpp/src/folder_data_broker.cpp | 20 ++++++++++---- worker/api/cpp/src/folder_data_broker.h | 2 +- worker/api/cpp/src/server_data_broker.cpp | 17 +++++++++--- worker/api/cpp/src/server_data_broker.h | 1 + .../api/cpp/unittests/test_folder_broker.cpp | 27 ++++++++++++++++++- worker/api/python/asapo_worker.pxd | 3 +++ worker/api/python/asapo_worker.pyx.in | 21 ++++++++++++--- 14 files changed, 138 insertions(+), 25 deletions(-) diff --git a/tests/automatic/worker/worker_api/check_linux.sh b/tests/automatic/worker/worker_api/check_linux.sh index 713baf3f8..189a40720 100644 --- a/tests/automatic/worker/worker_api/check_linux.sh +++ b/tests/automatic/worker/worker_api/check_linux.sh @@ -13,6 +13,7 @@ Cleanup() { nomad stop discovery nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} + rm -f 1_1 1 } @@ -24,9 +25,12 @@ sleep 1 for i in `seq 1 10`; do - echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done +echo hello1 > 1 + + $@ 127.0.0.1:8400 $database_name $token_test_run single #check datasets @@ -39,10 +43,12 @@ do images='' for j in `seq 1 3`; do - images="$images,{"_id":$j,"size":100,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" + images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" done images=${images#?} echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name} done +echo hello1 > 1_1 + $@ 127.0.0.1:8400 $database_name $token_test_run datasets diff --git a/tests/automatic/worker/worker_api/check_windows.bat b/tests/automatic/worker/worker_api/check_windows.bat index 9623bbe2e..cd2012938 100644 --- a/tests/automatic/worker/worker_api/check_windows.bat +++ b/tests/automatic/worker/worker_api/check_windows.bat @@ -10,13 +10,18 @@ c:\opt\consul\nomad run nginx.nmd ping 1.0.0.0 -n 10 -w 100 > nul -for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error + +echo hello1 > 1 + %1 127.0.0.1:8400 %database_name% %token_test_run% single || goto :error echo db.dropDatabase() | %mongo_exe% %database_name% -for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":100,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":100,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":100,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":100,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":100,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error + +echo hello1 > 1_1 %1 127.0.0.1:8400 %database_name% %token_test_run% datasets || goto :error @@ -31,4 +36,5 @@ c:\opt\consul\nomad stop discovery c:\opt\consul\nomad stop broker c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% +del "1 1_1" diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp index 325dc43be..227e98791 100644 --- a/tests/automatic/worker/worker_api/worker_api.cpp +++ b/tests/automatic/worker/worker_api/worker_api.cpp @@ -42,6 +42,12 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str M_AssertTrue(fi.name == "1", "GetNext filename"); M_AssertTrue(fi.metadata == "{\"test\":10}", "GetNext metadata"); + asapo::FileData data; + err = broker->RetrieveData(&fi,&data); + M_AssertTrue(err == nullptr, "RetrieveData no error"); + M_AssertEq("hello1",std::string(reinterpret_cast<char*>(data.get()))); + + err = broker->GetLast(&fi, group_id, nullptr); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(fi.name == "10", "GetLast filename"); @@ -115,6 +121,12 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st M_AssertTrue(dataset.content[2].name == "1_3", "GetNextDataSet filename"); M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetNext metadata"); + asapo::FileData data; + err = broker->RetrieveData(&dataset.content[0],&data); + M_AssertTrue(err == nullptr, "RetrieveData no error"); + M_AssertEq("hello1",std::string(reinterpret_cast<char*>(data.get()))); + + dataset = broker->GetLastDataset(group_id, &err); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename"); @@ -134,7 +146,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st void TestAll(const Args& args) { asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", args.run_name, args.token, &err); broker->SetTimeout(100); auto group_id = broker->GenerateNewGroupId(&err); diff --git a/tests/automatic/worker/worker_api_python/check_linux.sh b/tests/automatic/worker/worker_api_python/check_linux.sh index e81f0feb5..c1a3b854a 100644 --- a/tests/automatic/worker/worker_api_python/check_linux.sh +++ b/tests/automatic/worker/worker_api_python/check_linux.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source_path=dummy +source_path=. database_name=test_run token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= set -e @@ -15,15 +15,20 @@ Cleanup() { nomad stop discovery nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} + rm 1 1_1 } nomad run nginx.nmd nomad run discovery.nmd nomad run broker.nmd +echo hello1 > 1 +echo hello1 > 1_1 + + for i in `seq 1 5`; do - echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done sleep 1 @@ -43,7 +48,7 @@ do images='' for j in `seq 1 3`; do - images="$images,{"_id":$j,"size":100,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" + images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" done images=${images#?} echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name} diff --git a/tests/automatic/worker/worker_api_python/check_windows.bat b/tests/automatic/worker/worker_api_python/check_windows.bat index 19d04a571..ff3442c76 100644 --- a/tests/automatic/worker/worker_api_python/check_windows.bat +++ b/tests/automatic/worker/worker_api_python/check_windows.bat @@ -1,4 +1,4 @@ -SET source_path=dummy +SET source_path=. SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" @@ -10,15 +10,19 @@ c:\opt\consul\nomad run nginx.nmd ping 1.0.0.0 -n 10 -w 100 > nul -for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error set PYTHONPATH=%1 +echo hello1 > 1 +echo hello1 > 1_1 + + python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% single || goto :error echo db.dropDatabase() | %mongo_exe% %database_name% -for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":100,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":100,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":100,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% datasets || goto :error @@ -34,3 +38,4 @@ c:\opt\consul\nomad stop discovery c:\opt\consul\nomad stop broker c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% +del "1 1_1" diff --git a/tests/automatic/worker/worker_api_python/worker_api.py b/tests/automatic/worker/worker_api_python/worker_api.py index 3515a0d55..9af8f0f52 100644 --- a/tests/automatic/worker/worker_api_python/worker_api.py +++ b/tests/automatic/worker/worker_api_python/worker_api.py @@ -41,6 +41,9 @@ def check_single(broker,group_id_new): assert_metaname(meta,"1","get next1") assert_usermetadata(meta,"get next1") + data, err = broker.retrieve_data(meta) + assert_eq(data.tostring(),"hello1","retrieve_data data") + assert_noterr(err, "retrieve_data err") _, meta, err = broker.get_next(group_id_new, meta_only=True) assert_noterr(err, "get_next2") @@ -109,6 +112,11 @@ def check_dataset(broker,group_id_new): assert_metaname(metas[1],"1_2","get nextdataset1 name2") assert_usermetadata(metas[0],"get nextdataset1 meta") + data, err = broker.retrieve_data(metas[0]) + assert_eq(data.tostring(),"hello1","retrieve_data from dataset data") + assert_noterr(err, "retrieve_data from dataset err") + + id, metas, err = broker.get_next_dataset(group_id_new) assert_noterr(err, "get_next_dataset2") assert_eq(id,2,"get_next_dataset2") diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index f1f7d4ae7..4d6fbac02 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -55,7 +55,13 @@ class DataBroker { */ virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0; - + //! Retrieves image using fileinfo. + /*! + \param info - image metadata to use, can be updated after operation + \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \return Error if data is nullptr or data cannot be read, nullptr otherwise. + */ + virtual Error RetrieveData(FileInfo* info, FileData* data) = 0; //! Receive next available completed dataset. diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index fcdaa266c..cdc0fb1fa 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -44,22 +44,31 @@ Error FolderDataBroker::CanGetData(FileInfo* info, FileData* data, uint64_t nfil return nullptr; } + +Error FolderDataBroker::RetrieveData(FileInfo* info, FileData* data) { + if (data == nullptr || info == nullptr ) { + return TextError("pointers are empty"); + } + + Error error; + *data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error); + return error; +} + + Error FolderDataBroker::GetFileByIndex(uint64_t nfile_to_get, FileInfo* info, FileData* data) { auto err = CanGetData(info, data, nfile_to_get); if (err != nullptr) { return err; } - *info = filelist_[(size_t)nfile_to_get]; + *info = filelist_[(size_t) nfile_to_get]; if (data == nullptr) { return nullptr; } - Error error; - *data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error); - - return error; + return RetrieveData(info, data); } @@ -117,4 +126,5 @@ DataSet FolderDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro return {0, FileInfos{}}; } + } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index aa8dd533c..372ef2d51 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -28,7 +28,7 @@ class FolderDataBroker final : public asapo::DataBroker { DataSet GetNextDataset(std::string group_id, Error* err) override; DataSet GetLastDataset(std::string group_id, Error* err) override; DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override; - + Error RetrieveData(FileInfo* info, FileData* data) override; private: std::string base_path_; bool is_connected_; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 09f7c23fd..4b69c6cbf 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -191,9 +191,9 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t return GetDataIfNeeded(info, data); } -Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { - if (data == nullptr) { - return nullptr; +Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { + if (data == nullptr || info == nullptr ) { + return TextError("pointers are empty"); } if (DataCanBeInBuffer(info)) { @@ -209,6 +209,16 @@ Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { return error; } + +Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { + if (data == nullptr) { + return nullptr; + } + + return RetrieveData(info,data); + +} + bool ServerDataBroker::DataCanBeInBuffer(const FileInfo* info) { return info->buf_id > 0; } @@ -374,4 +384,5 @@ DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), err); } + } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index abf65776e..3676878ba 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -41,6 +41,7 @@ class ServerDataBroker final : public asapo::DataBroker { DataSet GetNextDataset(std::string group_id, Error* err) override; DataSet GetLastDataset(std::string group_id, Error* err) override; DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override; + Error RetrieveData(FileInfo* info, FileData* data) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index c302131f9..668e53dbb 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -156,7 +156,6 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsFileInfo) { } - TEST_F(FolderDataBrokerTests, GetNDataSets) { data_broker->Connect(); Error err; @@ -278,6 +277,32 @@ TEST_F(GetDataFromFileTests, GetNextReturnsDataAndInfo) { } + +TEST_F(GetDataFromFileTests, RetrieveDataCallsReadsFile) { + data_broker->Connect(); + FileInfo fi; + fi.name = "test"; + + + EXPECT_CALL(mock, GetDataFromFile_t(expected_base_path+asapo::kPathSeparator+"test", _, _)). + WillOnce(DoAll(testing::SetArgPointee<2>(nullptr), testing::Return(new uint8_t[1] {'1'}))); + + auto err = data_broker->RetrieveData(&fi, &data); + + ASSERT_THAT(data[0], Eq('1')); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(GetDataFromFileTests, RetrieveDataReturnsErrorWithEmptyPointer) { + data_broker->Connect(); + + auto err = data_broker->RetrieveData(&fi, nullptr); + + ASSERT_THAT(err, Ne(nullptr)); +} + + + TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotReadData) { EXPECT_CALL(mock, GetDataFromFile_t(_, _, _)). WillOnce(DoAll(testing::SetArgPointee<2>(asapo::IOErrorTemplates::kReadError.Generate().release()), diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd index 216266eb0..de9231176 100644 --- a/worker/api/python/asapo_worker.pxd +++ b/worker/api/python/asapo_worker.pxd @@ -1,6 +1,7 @@ from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.vector cimport vector +from libcpp cimport bool ctypedef unsigned char uint8_t @@ -22,6 +23,7 @@ cdef extern from "asapo_worker.h" namespace "asapo": cdef extern from "asapo_worker.h" namespace "asapo": cppclass FileInfo: string Json() + bool SetFromJson(string json_str) cppclass FileInfos: vector[FileInfo].iterator begin() vector[FileInfo].iterator end() @@ -45,6 +47,7 @@ cdef extern from "asapo_worker.h" namespace "asapo": DataSet GetNextDataset(string group_id, Error* err) DataSet GetLastDataset(string group_id, Error* err) DataSet GetDatasetById(uint64_t id,string group_id, Error* err) + Error RetrieveData(FileInfo* info, FileData* data) cdef extern from "asapo_worker.h" namespace "asapo": diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index bd7efaa90..9eb871abf 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -42,9 +42,6 @@ cdef class PyDataBroker: return None,None,err_str info_str = _str(info.Json()) meta = json.loads(info_str) - del meta['buf_id'] - del meta['source'] - del meta['lastchange'] if meta_only: return None,meta,None cdef char* ptr = <char*> data.release() @@ -57,6 +54,24 @@ cdef class PyDataBroker: return self._op("last",group_id,meta_only,0) def get_by_id(self,id,group_id,meta_only = True): return self._op("id",group_id,meta_only,id) + def retrieve_data(self,meta): + json_str = json.dumps(meta) + cdef FileInfo info + if not info.SetFromJson(_bytes(json_str)): + return None,"wrong metadata" + cdef Error err + cdef FileData data + err = self.c_broker.RetrieveData(&info, &data) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return None,err_str + cdef np.npy_intp dims[1] + dims[0] = meta['size'] + cdef char* ptr = <char*> data.release() + arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) + return arr,None + + def get_ndatasets(self): cdef Error err size = self.c_broker.GetNDataSets(&err) -- GitLab