Skip to content
Snippets Groups Projects
Commit da114ded authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

callback c/python

parent 83bacfdf
No related branches found
No related tags found
No related merge requests found
......@@ -12,7 +12,7 @@ cdef extern from "asapo_producer.h" namespace "asapo":
cdef extern from "asapo_wrappers.h" namespace "asapo":
cdef string GetErrorString(Error* err)
string GetErrorString(Error* err)
cdef extern from "asapo_producer.h" namespace "asapo":
......@@ -23,8 +23,7 @@ cdef extern from "asapo_producer.h" namespace "asapo":
cdef extern from "asapo_producer.h" namespace "asapo":
cppclass RequestHandlerType:
pass
cdef RequestHandlerType RequestHandlerType_Tcp "asapo::RequestHandlerType::kTcp"
RequestHandlerType RequestHandlerType_Tcp "asapo::RequestHandlerType::kTcp"
cdef extern from "asapo_producer.h" namespace "asapo":
......@@ -34,6 +33,42 @@ cdef extern from "asapo_producer.h" namespace "asapo":
string user_token
cdef extern from "asapo_producer.h" namespace "asapo":
cdef cppclass Producer:
struct EventHeader:
uint64_t file_id
uint64_t file_size
string file_name
string user_metadata
uint64_t subset_id
uint64_t subset_size
cdef extern from "asapo_producer.h" namespace "asapo":
struct EventHeader:
uint64_t file_id
uint64_t file_size
string file_name
string user_metadata
uint64_t subset_id
uint64_t subset_size
cdef extern from "asapo_producer.h" namespace "asapo":
struct GenericRequestHeader:
pass
cdef extern from "asapo_producer.h" namespace "asapo":
cppclass RequestCallback:
pass
cdef extern from "asapo_wrappers.h" namespace "asapo":
cdef cppclass function_wrapper:
ctypedef void (*cy_callback) (void*, GenericRequestHeader, Error)
@staticmethod
RequestCallback make_std_function(cy_callback, void*)
cdef extern from "asapo_producer.h" namespace "asapo":
cppclass Producer:
@staticmethod
unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error)
Error SendFile(const EventHeader& event_header, string full_path, uint64_t injest_mode,RequestCallback callback)
......@@ -27,10 +27,26 @@ cdef bytes _bytes(s):
else:
raise TypeError("Could not convert to unicode.")
ctypedef void (*cb_type)(void*, GenericRequestHeader, Error)
cdef class PyProducer:
cdef unique_ptr[Producer] c_producer
cdef object py_callback
def send_file(self,string fname):
cdef EventHeader event_header
err = self.c_producer.get().SendFile(event_header, _bytes(fname), 1,
function_wrapper.make_std_function(<cb_type>self.c_callback, <void*>self))
cdef err_str = GetErrorString(&err)
if err_str.strip():
return err_str
else:
return "ok"
cdef void c_callback(self,GenericRequestHeader header, Error err) with gil:
self.py_callback(1,2)
return
@staticmethod
def create_producer(endpoint,beamtime_id,stream,token,nthreads):
def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback):
pyProd = PyProducer()
cdef Error err
cdef SourceCredentials source
......@@ -38,14 +54,15 @@ cdef class PyProducer:
source.user_token = token
source.stream = stream
pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,&err)
pyProd.py_callback = py_callback
cdef err_str = GetErrorString(&err)
if err_str.strip():
return None,err_str
else:
return pyProd,None
def create_producer(endpoint,beamtime_id,stream,token,nthreads):
return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads)
def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback):
return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads,py_callback)
__version__ = "@ASAPO_VERSION_PYTHON@"
#ifndef ASAPO_ASAPO_WRAPPERS_H
#define ASAPO_ASAPO_WRAPPERS_H
#include <memory>
#include <functional>
namespace asapo {
inline std::string GetErrorString(asapo::Error* err) {
......@@ -10,6 +14,21 @@ inline std::string GetErrorString(asapo::Error* err) {
return "";
}
using cy_callback = void (*)(void*, GenericRequestHeader header, Error err);
class function_wrapper {
public:
static
RequestCallback make_std_function(cy_callback callback, void* c_self)
{
RequestCallback wrapper = [=](GenericRequestHeader header, Error err) -> void
{
callback(c_self, header, std::move(err));
};
return wrapper;
}
};
}
......
......@@ -11,10 +11,19 @@ stream = "stream"
token = ""
nthreads = 1
producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
def callback(i,j):
print(i,j)
producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads,callback)
if err is not None:
print(err)
sys.exit(1)
else:
print(producer)
producer.send_file("")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment