Skip to content
Snippets Groups Projects
Commit 108f3187 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

implemented retrieving data from given fileinfo

parent f8bbcb01
No related branches found
No related tags found
No related merge requests found
Showing
with 138 additions and 25 deletions
......@@ -13,6 +13,7 @@ Cleanup() {
nomad stop discovery
nomad stop broker
echo "db.dropDatabase()" | mongo ${database_name}
rm -f 1_1 1
}
......@@ -24,9 +25,12 @@ sleep 1
for i in `seq 1 10`;
do
echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
done
echo hello1 > 1
$@ 127.0.0.1:8400 $database_name $token_test_run single
#check datasets
......@@ -39,10 +43,12 @@ do
images=''
for j in `seq 1 3`;
do
images="$images,{"_id":$j,"size":100,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}"
images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}"
done
images=${images#?}
echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name}
done
echo hello1 > 1_1
$@ 127.0.0.1:8400 $database_name $token_test_run datasets
......@@ -10,13 +10,18 @@ c:\opt\consul\nomad run nginx.nmd
ping 1.0.0.0 -n 10 -w 100 > nul
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error
echo hello1 > 1
%1 127.0.0.1:8400 %database_name% %token_test_run% single || goto :error
echo db.dropDatabase() | %mongo_exe% %database_name%
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":100,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":100,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":100,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":100,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":100,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error
echo hello1 > 1_1
%1 127.0.0.1:8400 %database_name% %token_test_run% datasets || goto :error
......@@ -31,4 +36,5 @@ c:\opt\consul\nomad stop discovery
c:\opt\consul\nomad stop broker
c:\opt\consul\nomad stop nginx
echo db.dropDatabase() | %mongo_exe% %database_name%
del "1 1_1"
......@@ -42,6 +42,12 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str
M_AssertTrue(fi.name == "1", "GetNext filename");
M_AssertTrue(fi.metadata == "{\"test\":10}", "GetNext metadata");
asapo::FileData data;
err = broker->RetrieveData(&fi,&data);
M_AssertTrue(err == nullptr, "RetrieveData no error");
M_AssertEq("hello1",std::string(reinterpret_cast<char*>(data.get())));
err = broker->GetLast(&fi, group_id, nullptr);
M_AssertTrue(err == nullptr, "GetLast no error");
M_AssertTrue(fi.name == "10", "GetLast filename");
......@@ -115,6 +121,12 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st
M_AssertTrue(dataset.content[2].name == "1_3", "GetNextDataSet filename");
M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetNext metadata");
asapo::FileData data;
err = broker->RetrieveData(&dataset.content[0],&data);
M_AssertTrue(err == nullptr, "RetrieveData no error");
M_AssertEq("hello1",std::string(reinterpret_cast<char*>(data.get())));
dataset = broker->GetLastDataset(group_id, &err);
M_AssertTrue(err == nullptr, "GetLast no error");
M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename");
......@@ -134,7 +146,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st
void TestAll(const Args& args) {
asapo::Error err;
auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err);
auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", args.run_name, args.token, &err);
broker->SetTimeout(100);
auto group_id = broker->GenerateNewGroupId(&err);
......
#!/usr/bin/env bash
source_path=dummy
source_path=.
database_name=test_run
token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
set -e
......@@ -15,15 +15,20 @@ Cleanup() {
nomad stop discovery
nomad stop broker
echo "db.dropDatabase()" | mongo ${database_name}
rm 1 1_1
}
nomad run nginx.nmd
nomad run discovery.nmd
nomad run broker.nmd
echo hello1 > 1
echo hello1 > 1_1
for i in `seq 1 5`;
do
echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
done
sleep 1
......@@ -43,7 +48,7 @@ do
images=''
for j in `seq 1 3`;
do
images="$images,{"_id":$j,"size":100,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}"
images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}"
done
images=${images#?}
echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name}
......
SET source_path=dummy
SET source_path=.
SET database_name=test_run
SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
......@@ -10,15 +10,19 @@ c:\opt\consul\nomad run nginx.nmd
ping 1.0.0.0 -n 10 -w 100 > nul
for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error
for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error
set PYTHONPATH=%1
echo hello1 > 1
echo hello1 > 1_1
python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% single || goto :error
echo db.dropDatabase() | %mongo_exe% %database_name%
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":100,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":100,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":100,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error
python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% datasets || goto :error
......@@ -34,3 +38,4 @@ c:\opt\consul\nomad stop discovery
c:\opt\consul\nomad stop broker
c:\opt\consul\nomad stop nginx
echo db.dropDatabase() | %mongo_exe% %database_name%
del "1 1_1"
......@@ -41,6 +41,9 @@ def check_single(broker,group_id_new):
assert_metaname(meta,"1","get next1")
assert_usermetadata(meta,"get next1")
data, err = broker.retrieve_data(meta)
assert_eq(data.tostring(),"hello1","retrieve_data data")
assert_noterr(err, "retrieve_data err")
_, meta, err = broker.get_next(group_id_new, meta_only=True)
assert_noterr(err, "get_next2")
......@@ -109,6 +112,11 @@ def check_dataset(broker,group_id_new):
assert_metaname(metas[1],"1_2","get nextdataset1 name2")
assert_usermetadata(metas[0],"get nextdataset1 meta")
data, err = broker.retrieve_data(metas[0])
assert_eq(data.tostring(),"hello1","retrieve_data from dataset data")
assert_noterr(err, "retrieve_data from dataset err")
id, metas, err = broker.get_next_dataset(group_id_new)
assert_noterr(err, "get_next_dataset2")
assert_eq(id,2,"get_next_dataset2")
......
......@@ -55,7 +55,13 @@ class DataBroker {
*/
virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0;
//! Retrieves image using fileinfo.
/*!
\param info - image metadata to use, can be updated after operation
\param data - where to store image data. Can be set to nullptr only image metadata is needed.
\return Error if data is nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error RetrieveData(FileInfo* info, FileData* data) = 0;
//! Receive next available completed dataset.
......
......@@ -44,22 +44,31 @@ Error FolderDataBroker::CanGetData(FileInfo* info, FileData* data, uint64_t nfil
return nullptr;
}
Error FolderDataBroker::RetrieveData(FileInfo* info, FileData* data) {
if (data == nullptr || info == nullptr ) {
return TextError("pointers are empty");
}
Error error;
*data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error);
return error;
}
Error FolderDataBroker::GetFileByIndex(uint64_t nfile_to_get, FileInfo* info, FileData* data) {
auto err = CanGetData(info, data, nfile_to_get);
if (err != nullptr) {
return err;
}
*info = filelist_[(size_t)nfile_to_get];
*info = filelist_[(size_t) nfile_to_get];
if (data == nullptr) {
return nullptr;
}
Error error;
*data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error);
return error;
return RetrieveData(info, data);
}
......@@ -117,4 +126,5 @@ DataSet FolderDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro
return {0, FileInfos{}};
}
}
......@@ -28,7 +28,7 @@ class FolderDataBroker final : public asapo::DataBroker {
DataSet GetNextDataset(std::string group_id, Error* err) override;
DataSet GetLastDataset(std::string group_id, Error* err) override;
DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override;
Error RetrieveData(FileInfo* info, FileData* data) override;
private:
std::string base_path_;
bool is_connected_;
......
......@@ -191,9 +191,9 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t
return GetDataIfNeeded(info, data);
}
Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) {
if (data == nullptr) {
return nullptr;
Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) {
if (data == nullptr || info == nullptr ) {
return TextError("pointers are empty");
}
if (DataCanBeInBuffer(info)) {
......@@ -209,6 +209,16 @@ Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) {
return error;
}
Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) {
if (data == nullptr) {
return nullptr;
}
return RetrieveData(info,data);
}
bool ServerDataBroker::DataCanBeInBuffer(const FileInfo* info) {
return info->buf_id > 0;
}
......@@ -374,4 +384,5 @@ DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro
return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), err);
}
}
......@@ -41,6 +41,7 @@ class ServerDataBroker final : public asapo::DataBroker {
DataSet GetNextDataset(std::string group_id, Error* err) override;
DataSet GetLastDataset(std::string group_id, Error* err) override;
DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override;
Error RetrieveData(FileInfo* info, FileData* data) override;
std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch
std::unique_ptr<HttpClient> httpclient__;
......
......@@ -156,7 +156,6 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsFileInfo) {
}
TEST_F(FolderDataBrokerTests, GetNDataSets) {
data_broker->Connect();
Error err;
......@@ -278,6 +277,32 @@ TEST_F(GetDataFromFileTests, GetNextReturnsDataAndInfo) {
}
TEST_F(GetDataFromFileTests, RetrieveDataCallsReadsFile) {
data_broker->Connect();
FileInfo fi;
fi.name = "test";
EXPECT_CALL(mock, GetDataFromFile_t(expected_base_path+asapo::kPathSeparator+"test", _, _)).
WillOnce(DoAll(testing::SetArgPointee<2>(nullptr), testing::Return(new uint8_t[1] {'1'})));
auto err = data_broker->RetrieveData(&fi, &data);
ASSERT_THAT(data[0], Eq('1'));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(GetDataFromFileTests, RetrieveDataReturnsErrorWithEmptyPointer) {
data_broker->Connect();
auto err = data_broker->RetrieveData(&fi, nullptr);
ASSERT_THAT(err, Ne(nullptr));
}
TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotReadData) {
EXPECT_CALL(mock, GetDataFromFile_t(_, _, _)).
WillOnce(DoAll(testing::SetArgPointee<2>(asapo::IOErrorTemplates::kReadError.Generate().release()),
......
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp cimport bool
ctypedef unsigned char uint8_t
......@@ -22,6 +23,7 @@ cdef extern from "asapo_worker.h" namespace "asapo":
cdef extern from "asapo_worker.h" namespace "asapo":
cppclass FileInfo:
string Json()
bool SetFromJson(string json_str)
cppclass FileInfos:
vector[FileInfo].iterator begin()
vector[FileInfo].iterator end()
......@@ -45,6 +47,7 @@ cdef extern from "asapo_worker.h" namespace "asapo":
DataSet GetNextDataset(string group_id, Error* err)
DataSet GetLastDataset(string group_id, Error* err)
DataSet GetDatasetById(uint64_t id,string group_id, Error* err)
Error RetrieveData(FileInfo* info, FileData* data)
cdef extern from "asapo_worker.h" namespace "asapo":
......
......@@ -42,9 +42,6 @@ cdef class PyDataBroker:
return None,None,err_str
info_str = _str(info.Json())
meta = json.loads(info_str)
del meta['buf_id']
del meta['source']
del meta['lastchange']
if meta_only:
return None,meta,None
cdef char* ptr = <char*> data.release()
......@@ -57,6 +54,24 @@ cdef class PyDataBroker:
return self._op("last",group_id,meta_only,0)
def get_by_id(self,id,group_id,meta_only = True):
return self._op("id",group_id,meta_only,id)
def retrieve_data(self,meta):
json_str = json.dumps(meta)
cdef FileInfo info
if not info.SetFromJson(_bytes(json_str)):
return None,"wrong metadata"
cdef Error err
cdef FileData data
err = self.c_broker.RetrieveData(&info, &data)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None,err_str
cdef np.npy_intp dims[1]
dims[0] = meta['size']
cdef char* ptr = <char*> data.release()
arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr)
return arr,None
def get_ndatasets(self):
cdef Error err
size = self.c_broker.GetNDataSets(&err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment