diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index a35d824a1290d563dd42f1787addf8d4b6ca820f..3dcbdd19dbb2c239653db22ae61d2fd052a7fd22 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -12,7 +12,7 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef extern from "asapo_wrappers.h" namespace "asapo": - cdef string GetErrorString(Error* err) + string GetErrorString(Error* err) cdef extern from "asapo_producer.h" namespace "asapo": @@ -23,8 +23,7 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef extern from "asapo_producer.h" namespace "asapo": cppclass RequestHandlerType: pass - cdef RequestHandlerType RequestHandlerType_Tcp "asapo::RequestHandlerType::kTcp" - + RequestHandlerType RequestHandlerType_Tcp "asapo::RequestHandlerType::kTcp" cdef extern from "asapo_producer.h" namespace "asapo": @@ -34,6 +33,42 @@ cdef extern from "asapo_producer.h" namespace "asapo": string user_token cdef extern from "asapo_producer.h" namespace "asapo": - cdef cppclass Producer: + struct EventHeader: + uint64_t file_id + uint64_t file_size + string file_name + string user_metadata + uint64_t subset_id + uint64_t subset_size + +cdef extern from "asapo_producer.h" namespace "asapo": + struct EventHeader: + uint64_t file_id + uint64_t file_size + string file_name + string user_metadata + uint64_t subset_id + uint64_t subset_size + +cdef extern from "asapo_producer.h" namespace "asapo": + struct GenericRequestHeader: + pass + +cdef extern from "asapo_producer.h" namespace "asapo": + cppclass RequestCallback: + pass + + +cdef extern from "asapo_wrappers.h" namespace "asapo": + cdef cppclass function_wrapper: + ctypedef void (*cy_callback) (void*, GenericRequestHeader, Error) + @staticmethod + RequestCallback make_std_function(cy_callback, void*) + + +cdef extern from "asapo_producer.h" namespace "asapo": + cppclass Producer: @staticmethod unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error) + Error SendFile(const EventHeader& event_header, string full_path, uint64_t injest_mode,RequestCallback callback) + diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index ca839ce1218d0575e4200d0f522494c424c956f0..4ef65b11312c3f330fec63d05d5717443392cc53 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -27,10 +27,26 @@ cdef bytes _bytes(s): else: raise TypeError("Could not convert to unicode.") +ctypedef void (*cb_type)(void*, GenericRequestHeader, Error) + + cdef class PyProducer: cdef unique_ptr[Producer] c_producer + cdef object py_callback + def send_file(self,string fname): + cdef EventHeader event_header + err = self.c_producer.get().SendFile(event_header, _bytes(fname), 1, + function_wrapper.make_std_function(<cb_type>self.c_callback, <void*>self)) + cdef err_str = GetErrorString(&err) + if err_str.strip(): + return err_str + else: + return "ok" + cdef void c_callback(self,GenericRequestHeader header, Error err) with gil: + self.py_callback(1,2) + return @staticmethod - def create_producer(endpoint,beamtime_id,stream,token,nthreads): + def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback): pyProd = PyProducer() cdef Error err cdef SourceCredentials source @@ -38,14 +54,15 @@ cdef class PyProducer: source.user_token = token source.stream = stream pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,&err) + pyProd.py_callback = py_callback cdef err_str = GetErrorString(&err) if err_str.strip(): return None,err_str else: return pyProd,None -def create_producer(endpoint,beamtime_id,stream,token,nthreads): - return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads) +def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback): + return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads,py_callback) __version__ = "@ASAPO_VERSION_PYTHON@" diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h index c59b8d8c7e0de1e7dd84dd7d36a0e52f09d8badc..034e0fc875ae62f7018e800960ed576b4fe09695 100644 --- a/producer/api/python/asapo_wrappers.h +++ b/producer/api/python/asapo_wrappers.h @@ -1,6 +1,10 @@ #ifndef ASAPO_ASAPO_WRAPPERS_H #define ASAPO_ASAPO_WRAPPERS_H +#include <memory> +#include <functional> + + namespace asapo { inline std::string GetErrorString(asapo::Error* err) { @@ -10,6 +14,21 @@ inline std::string GetErrorString(asapo::Error* err) { return ""; } +using cy_callback = void (*)(void*, GenericRequestHeader header, Error err); + +class function_wrapper { + public: + static + RequestCallback make_std_function(cy_callback callback, void* c_self) + { + RequestCallback wrapper = [=](GenericRequestHeader header, Error err) -> void + { + callback(c_self, header, std::move(err)); + }; + return wrapper; + } +}; + } diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py index a1c1abd58c6879678675745d48ab30dea3bde884..9e93ff9768db5bef591f48224bc295aee5f193c4 100644 --- a/tests/manual/python_tests/producer/test.py +++ b/tests/manual/python_tests/producer/test.py @@ -11,10 +11,19 @@ stream = "stream" token = "" nthreads = 1 -producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) +def callback(i,j): + print(i,j) + + +producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads,callback) if err is not None: print(err) + sys.exit(1) else: print(producer) + +producer.send_file("") + +