Skip to content
Snippets Groups Projects
Commit 4322d2df authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

substreams in Python producer API, test

parent 382b47ed
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ IMPROVEMENTS
* receiver use ASAP3 directory structure to save files to
* API documentation is available at [asapo-docs.desy.de](asapo-docs.desy.de)
* switch to using cmake 3.7+
* error messages in Python as Python strings, not byte objects
BUG FIXES
* consumer operation timout - take duration of the operation into account
\ No newline at end of file
......@@ -72,6 +72,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil:
cdef extern from "asapo_consumer.h" namespace "asapo":
ErrorTemplateInterface kNoData "asapo::ConsumerErrorTemplates::kNoData"
ErrorTemplateInterface kEndOfStream "asapo::ConsumerErrorTemplates::kEndOfStream"
ErrorTemplateInterface kStreamFinished "asapo::ConsumerErrorTemplates::kStreamFinished"
ErrorTemplateInterface kUnavailableService "asapo::ConsumerErrorTemplates::kUnavailableService"
ErrorTemplateInterface kInterruptedTransaction "asapo::ConsumerErrorTemplates::kInterruptedTransaction"
ErrorTemplateInterface kLocalIOError "asapo::ConsumerErrorTemplates::kLocalIOError"
......@@ -79,4 +80,5 @@ cdef extern from "asapo_consumer.h" namespace "asapo":
cdef cppclass ConsumerErrorData:
uint64_t id
uint64_t id_max
string next_substream
......@@ -37,12 +37,22 @@ class AsapoWrongInputError(AsapoConsumerError):
class AsapoInterruptedTransactionError(AsapoConsumerError):
pass
class AsapoStreamFinishedError(AsapoConsumerError):
pass
class AsapoUnavailableServiceError(AsapoConsumerError):
pass
class AsapoLocalIOError(AsapoConsumerError):
pass
class AsapoStreamFinishedError(AsapoConsumerError):
def __init__(self,message,id_max=None,next_substream=None):
AsapoConsumerError.__init__(self,message)
self.id_max = id_max
self.next_substream = _str(next_substream)
class AsapoEndOfStreamError(AsapoConsumerError):
def __init__(self,message,id_max=None):
AsapoConsumerError.__init__(self,message)
......@@ -63,6 +73,12 @@ cdef throw_exception(Error& err):
raise AsapoEndOfStreamError(error_string,data.id_max)
else:
raise AsapoEndOfStreamError(error_string)
elif err == kStreamFinished:
data=<ConsumerErrorData*>err.get().GetCustomData()
if data != NULL:
raise AsapoStreamFinishedError(error_string,data.id_max,data.next_substream)
else:
raise AsapoStreamFinishedError(error_string)
elif err == kNoData:
data=<ConsumerErrorData*>err.get().GetCustomData()
if data != NULL:
......@@ -81,7 +97,6 @@ cdef throw_exception(Error& err):
raise AsapoConsumerError(error_string)
cdef class PyDataBroker:
_default_stream = "default"
cdef DataBroker* c_broker
def _op(self, op, group_id, substream, meta_only, uint64_t id):
cdef FileInfo info
......@@ -111,11 +126,11 @@ cdef class PyDataBroker:
arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr)
PyArray_ENABLEFLAGS(arr,np.NPY_OWNDATA)
return arr,meta
def get_next(self, group_id, substream = _default_stream, meta_only = True):
def get_next(self, group_id, substream = "default", meta_only = True):
return self._op("next",group_id,substream,meta_only,0)
def get_last(self, group_id, substream = _default_stream, meta_only = True):
def get_last(self, group_id, substream = "default", meta_only = True):
return self._op("last",group_id,substream,meta_only,0)
def get_by_id(self,id,group_id, substream = _default_stream,meta_only = True):
def get_by_id(self,id,group_id, substream = "default",meta_only = True):
return self._op("id",group_id,substream,meta_only,id)
def retrieve_data(self,meta):
json_str = json.dumps(meta)
......@@ -134,7 +149,7 @@ cdef class PyDataBroker:
arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr)
PyArray_ENABLEFLAGS(arr,np.NPY_OWNDATA)
return arr
def get_current_size(self, substream = _default_stream):
def get_current_size(self, substream = "default"):
cdef Error err
cdef uint64_t size
cdef string b_substream = _bytes(substream)
......@@ -146,7 +161,7 @@ cdef class PyDataBroker:
return size
def set_timeout(self,timeout):
self.c_broker.SetTimeout(timeout)
def set_lastread_marker(self,value,group_id, substream = _default_stream):
def set_lastread_marker(self,value,group_id, substream = "default"):
cdef string b_group_id = _bytes(group_id)
cdef string b_substream = _bytes(substream)
cdef Error err
......@@ -156,7 +171,7 @@ cdef class PyDataBroker:
if err:
throw_exception(err)
return
def reset_lastread_marker(self,group_id, substream = _default_stream):
def reset_lastread_marker(self,group_id, substream = "default"):
cdef string b_group_id = _bytes(group_id)
cdef string b_substream = _bytes(substream)
cdef Error err
......@@ -185,7 +200,7 @@ cdef class PyDataBroker:
list.append(_str(substream))
return list
def query_images(self,query, substream = _default_stream):
def query_images(self,query, substream = "default"):
cdef string b_query = _bytes(query)
cdef string b_substream = _bytes(substream)
cdef Error err
......@@ -219,11 +234,11 @@ cdef class PyDataBroker:
for fi in dataset.content:
json_list.append(json.loads(_str(fi.Json())))
return dataset.id, json_list
def get_next_dataset(self, group_id, substream = _default_stream):
def get_next_dataset(self, group_id, substream = "default"):
return self._op_dataset("next",group_id,substream,0)
def get_last_dataset(self, group_id, substream = _default_stream):
def get_last_dataset(self, group_id, substream = "default"):
return self._op_dataset("last",group_id,substream,0)
def get_dataset_by_id(self, id, group_id, substream = _default_stream):
def get_dataset_by_id(self, id, group_id, substream = "default"):
return self._op_dataset("id",group_id,substream,id)
def get_beamtime_meta(self):
cdef Error err
......
......@@ -89,12 +89,13 @@ cdef extern from "asapo_producer.h" namespace "asapo" nogil:
cppclass Producer:
@staticmethod
unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error)
Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback)
Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback)
Error SendFile(const EventHeader& event_header, string substream, string full_path, uint64_t ingest_mode,RequestCallback callback)
Error SendData__(const EventHeader& event_header, string substream, void* data, uint64_t ingest_mode,RequestCallback callback)
void StopThreads__()
void SetLogLevel(LogLevel level)
uint64_t GetRequestsQueueSize()
Error WaitRequestsFinished(uint64_t timeout_ms)
Error SendSubstreamFinishedFlag(string substream, uint64_t last_id, string next_substream, RequestCallback callback)
cdef extern from "asapo_producer.h" namespace "asapo":
uint64_t kDefaultIngestMode
......
......@@ -48,14 +48,15 @@ class AsapoTimeOutError(AsapoProducerError):
pass
cdef python_exception_from_error(Error& err):
error_string = _str(err.get().Explain())
if err == kTimeout:
return AsapoTimeOutError(err.get().Explain())
return AsapoTimeOutError(error_string)
elif err == kWrongInput:
return AsapoWrongInputError(err.get().Explain())
return AsapoWrongInputError(error_string)
elif err == kLocalIOError:
return AsapoLocalIOError(err.get().Explain())
return AsapoLocalIOError(error_string)
else:
return AsapoProducerError(err.get().Explain())
return AsapoProducerError(error_string)
cdef throw_exception(Error& err):
raise python_exception_from_error(err)
......@@ -99,14 +100,14 @@ cdef class PyProducer:
return
self.c_producer.get().SetLogLevel(log_level)
def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,substream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None):
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
event_header.file_id = id
if data is None:
event_header.file_size = 0
else:
event_header.file_size = data.nbytes
err = self.c_producer.get().SendData__(event_header, data_pointer_nparray(data), ingest_mode,
err = self.c_producer.get().SendData__(event_header, _bytes(substream), data_pointer_nparray(data),ingest_mode,
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
<void*>self,<void*>callback, <void*>data))
if err:
......@@ -133,10 +134,10 @@ cdef class PyProducer:
event_header.subset_size = subset[1]
return event_header
def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None, substream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None):
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
event_header.file_size = len(data)
err = self.c_producer.get().SendData__(event_header, data_pointer_bytes(data), ingest_mode,
err = self.c_producer.get().SendData__(event_header,_bytes(substream), data_pointer_bytes(data), ingest_mode,
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
<void*>self,<void*>callback, <void*>data))
if err:
......@@ -146,7 +147,7 @@ cdef class PyProducer:
Py_XINCREF(<PyObject*>callback)
return
def send_data(self, id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def send_data(self, id, exposed_path, data, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
"""
:param id: unique data id
:type id: int
......@@ -167,13 +168,31 @@ cdef class PyProducer:
AsapoProducerError: actually should not happen
"""
if type(data) == np.ndarray or data == None:
self.__send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
self.__send_np_array(id,exposed_path,data,user_meta,subset,substream,ingest_mode,callback)
elif type(data) == bytes:
self.__send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
self.__send_bytes(id,exposed_path,data,user_meta,subset,substream,ingest_mode,callback)
else:
raise(AsapoProducerError("wrong data type: " + str(type(data))))
def send_substream_finished_flag(self, substream, last_id, next_substream = None, callback = None):
"""
:param substream: substream name
:type substream: string
:param id: id of the last record
:param next_substream: name of the next substream or None
:type substream: string
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoProducerError: actually should not happen
"""
err = self.c_producer.get().SendSubstreamFinishedFlag(_bytes(substream), last_id,_bytes(next_substream) if next_substream != None else "",
unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
if err:
throw_exception(err)
def send_file(self, id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def send_file(self, id, local_path, exposed_path, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
"""
:param id: unique data id
:type id: int
......@@ -197,7 +216,7 @@ cdef class PyProducer:
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
event_header.file_size = 0
err = self.c_producer.get().SendFile(event_header, _bytes(local_path), ingest_mode,
err = self.c_producer.get().SendFile(event_header, _bytes(local_path), _bytes(substream), ingest_mode,
unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
if err:
throw_exception(err)
......
......@@ -11,4 +11,5 @@ add_subdirectory(simple_chain_filegen_multisource)
add_subdirectory(simple_chain_filegen_readdata_cache)
add_subdirectory(simple_chain_filegen_readdata_file)
add_subdirectory(simple_chain_dataset)
add_subdirectory(send_recv_substreams)
\ No newline at end of file
add_subdirectory(send_recv_substreams)
add_subdirectory(send_recv_substreams_python)
\ No newline at end of file
set(TARGET_NAME send_recv_substreams_python)
prepare_asapo()
if (UNIX)
get_target_property(PYTHON_LIBS_CONSUMER python-lib BINARY_DIR)
get_target_property(PYTHON_LIBS_PRODUCER python-lib-producer BINARY_DIR)
else()
get_target_property(PYTHON_LIBS_CONSUMER asapo_consumer BINARY_DIR)
get_target_property(PYTHON_LIBS_PRODUCER asapo_producer BINARY_DIR)
endif()
file(TO_NATIVE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/send_recv_substreams.py TEST_SCRIPT )
add_script_test("${TARGET_NAME}" "${Python_EXECUTABLE} ${PYTHON_LIBS_CONSUMER} ${PYTHON_LIBS_PRODUCER} ${TEST_SCRIPT} " nomem)
#!/usr/bin/env bash
source_path=.
beamtime_id=asapo_test
stream_in=detector
indatabase_name=${beamtime_id}_${stream_in}
token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk=
beamline=test
set -e
trap Cleanup EXIT
Cleanup() {
set +e
nomad stop nginx
nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill
nomad stop discovery
nomad stop broker
nomad stop receiver
nomad stop authorizer
echo "db.dropDatabase()" | mongo ${indatabase_name}
}
nomad run nginx.nmd
nomad run discovery.nmd
nomad run broker.nmd
nomad run receiver.nmd
nomad run authorizer.nmd
export PYTHONPATH=$2:$3:${PYTHONPATH}
$1 $4 127.0.0.1:8400 $beamtime_id $token
\ No newline at end of file
SET source_path=.
SET beamtime_id=asapo_test
SET stream_in=detector
SET indatabase_name=%beamtime_id%_%stream_in%
SET token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk=
SET beamline=test
SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe"
c:\opt\consul\nomad run discovery.nmd
c:\opt\consul\nomad run broker.nmd
c:\opt\consul\nomad run nginx.nmd
c:\opt\consul\nomad run receiver.nmd
c:\opt\consul\nomad run authorizer.nmd
"%1" 127.0.0.1:8400 %beamtime_id% %token%
goto :clean
:error
call :clean
exit /b 1
:clean
c:\opt\consul\nomad stop discovery
c:\opt\consul\nomad stop broker
c:\opt\consul\nomad stop nginx
c:\opt\consul\nomad run nginx_kill.nmd && c:\opt\consul\nomad stop -yes -purge nginx_kill
c:\opt\consul\nomad stop receiver
c:\opt\consul\nomad stop authorizer
echo db.dropDatabase() | %mongo_exe% %indatabase_name%
from __future__ import print_function
import asapo_consumer
import asapo_producer
import sys
import os
import threading
lock = threading.Lock()
timeout = 10 * 1000
def assert_eq(val,expected,name):
if val != expected:
print ("error at "+name)
print ('val: ', val,' expected: ',expected)
sys.exit(1)
def callback(header,err):
lock.acquire() # to print
if err is not None:
print("could not sent: ",header,err)
else:
print ("successfuly sent: ",header)
lock.release()
source, beamtime, token = sys.argv[1:]
broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout)
producer = asapo_producer.create_producer(source,beamtime, "", token, 1)
producer.set_log_level("debug")
group_id = broker.generate_group_id()
n_send = 10
for i in range(n_send):
producer.send_data(i+1, "name"+str(i),None,ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,substream = "substream", callback = callback)
producer.send_substream_finished_flag("substream", 10, next_substream = "next_substream", callback = callback)
producer.wait_requests_finished(timeout)
n_recv = 0
substream_finished=False
while True:
try:
data, meta = broker.get_next(group_id,substream = "substream", meta_only=True)
print ("received: ",meta)
n_recv = n_recv + 1
except asapo_consumer.AsapoStreamFinishedError as finished_substream:
substream_finished = True
assert_eq(finished_substream.id_max, 11, "last id")
assert_eq(finished_substream.next_substream, "next_substream", "next substream")
break
assert_eq(n_recv, n_send, "send=recv")
assert_eq(substream_finished, True, "substream finished")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment