Skip to content
Snippets Groups Projects
Commit 25e48efa authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix memleaks, python tests

parent 93b26ac4
No related branches found
No related tags found
No related merge requests found
...@@ -176,7 +176,6 @@ void asapo_consumer_set_resend_nacs(AsapoConsumerHandle consumer, ...@@ -176,7 +176,6 @@ void asapo_consumer_set_resend_nacs(AsapoConsumerHandle consumer,
uint64_t delay_ms, uint64_t delay_ms,
uint64_t resend_attempts); uint64_t resend_attempts);
// TODO: What happend to asapo_create_source_credentials?
const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data); const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data);
const char* asapo_message_meta_get_name(const AsapoMessageMetaHandle md); const char* asapo_message_meta_get_name(const AsapoMessageMetaHandle md);
void asapo_message_meta_get_timestamp(const AsapoMessageMetaHandle md, void asapo_message_meta_get_timestamp(const AsapoMessageMetaHandle md,
......
...@@ -427,7 +427,7 @@ cdef class __PyConsumerFactory: ...@@ -427,7 +427,7 @@ cdef class __PyConsumerFactory:
consumer.c_consumer.get().SetTimeout(timeout) consumer.c_consumer.get().SetTimeout(timeout)
return consumer return consumer
def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms): def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms,instance_id='auto',pipeline_step='auto'):
""" """
:param server_name: Server endpoint (hostname:port) :param server_name: Server endpoint (hostname:port)
:type server_name: string :type server_name: string
...@@ -435,10 +435,6 @@ def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_ ...@@ -435,10 +435,6 @@ def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_
:type source_path: string :type source_path: string
:param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data :param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data
:type has_filesystem: bool :type has_filesystem: bool
:param instance_id: instance id, can be "auto", will create a combination out of hostname and pid
:type instance_id: string
:param pipeline_step: pipeline step id, can be "auto", "DefaultStep" is used then
:type pipeline_step: string
:param beamline: beamline name, can be "auto" if beamtime_id is given :param beamline: beamline name, can be "auto" if beamtime_id is given
:type beamline: string :type beamline: string
:param data_source: name of the data source that produces data :param data_source: name of the data source that produces data
...@@ -449,6 +445,10 @@ def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_ ...@@ -449,6 +445,10 @@ def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_
:type nthreads: int :type nthreads: int
:param timeout_ms: send requests timeout in milliseconds :param timeout_ms: send requests timeout in milliseconds
:type timeout_ms: int :type timeout_ms: int
:param instance_id: instance id, can be "auto" (default), will create a combination out of hostname and pid
:type instance_id: string
:param pipeline_step: pipeline step id, can be "auto" (default), "DefaultStep" is used then
:type pipeline_step: string
:return: consumer object and error. (None,err) if case of error, (consumer, None) if success :return: consumer object and error. (None,err) if case of error, (consumer, None) if success
:rtype: Tuple with consumer object and error. :rtype: Tuple with consumer object and error.
""" """
......
This diff is collapsed.
...@@ -491,16 +491,12 @@ cdef class PyProducer: ...@@ -491,16 +491,12 @@ cdef class PyProducer:
throw_exception(err) throw_exception(err)
return pyProd return pyProd
def create_producer(endpoint,type,instance_id,pipeline_step,beamtime_id,beamline,data_source,token,nthreads,timeout_ms): def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_ms,instance_id='auto',pipeline_step='auto'):
""" """
:param endpoint: server endpoint (url:port) :param endpoint: server endpoint (url:port)
:type endpoint: string :type endpoint: string
:param type: source type, "raw" to write to "raw" folder in beamline filesystem,"processed" to write to "processed" folder in core filesystem :param type: source type, "raw" to write to "raw" folder in beamline filesystem,"processed" to write to "processed" folder in core filesystem
:type type: string :type type: string
:param instance_id: instance id, can be "auto", will create a combination out of hostname and pid
:type instance_id: string
:param pipeline_step: pipeline step id, can be "auto", "DefaultStep" is used then
:type pipeline_step: string
:param beamline: beamline name, can be "auto" if beamtime_id is given :param beamline: beamline name, can be "auto" if beamtime_id is given
:type beamline: string :type beamline: string
:param data_source: name of the data source that produces data :param data_source: name of the data source that produces data
...@@ -511,6 +507,10 @@ def create_producer(endpoint,type,instance_id,pipeline_step,beamtime_id,beamline ...@@ -511,6 +507,10 @@ def create_producer(endpoint,type,instance_id,pipeline_step,beamtime_id,beamline
:type nthreads: int :type nthreads: int
:param timeout_ms: send requests timeout in milliseconds :param timeout_ms: send requests timeout in milliseconds
:type timeout_ms: int :type timeout_ms: int
:param instance_id: instance id, can be "auto" (default), will create a combination out of hostname and pid
:type instance_id: string
:param pipeline_step: pipeline step id, can be "auto" (default), "DefaultStep" is used then
:type pipeline_step: string
:raises: :raises:
AsapoWrongInputError: wrong input (number of threads, ,,,) AsapoWrongInputError: wrong input (number of threads, ,,,)
AsapoProducerError: actually should not happen AsapoProducerError: actually should not happen
......
...@@ -25,7 +25,7 @@ class AsapoSender: ...@@ -25,7 +25,7 @@ class AsapoSender:
def _callback(self, header, err): def _callback(self, header, err):
print ("hello self callback") print ("hello self callback")
producer = asapo_producer.create_producer(endpoint,'processed','auto','auto',beamtime,'auto', data_source, token, nthreads, 600000) producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000)
producer.set_log_level("debug") producer.set_log_level("debug")
print(asapo_producer.__version__) print(asapo_producer.__version__)
......
...@@ -27,7 +27,7 @@ def callback(header,err): ...@@ -27,7 +27,7 @@ def callback(header,err):
lock.release() lock.release()
producer = asapo_producer.create_producer(endpoint,'processed','auto','auto','auto',beamline, data_source, token, nthreads, 60000) producer = asapo_producer.create_producer(endpoint,'processed','auto',beamline, data_source, token, nthreads, 60000)
producer.set_log_level("debug") producer.set_log_level("debug")
......
...@@ -47,7 +47,7 @@ def assert_version(version): ...@@ -47,7 +47,7 @@ def assert_version(version):
if not ok: if not ok:
sys.exit(1) sys.exit(1)
producer = asapo_producer.create_producer(endpoint,'processed','auto','auto', beamtime, 'auto', data_source, token, nthreads, 60000) producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 60000)
producer.set_log_level("debug") producer.set_log_level("debug")
...@@ -233,7 +233,7 @@ else: ...@@ -233,7 +233,7 @@ else:
# create with error # create with error
try: try:
producer = asapo_producer.create_producer(endpoint,'processed','auto','auto', beamtime, 'auto', data_source, token, 0, 0) producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, 0, 0)
except asapo_producer.AsapoWrongInputError as e: except asapo_producer.AsapoWrongInputError as e:
print(e) print(e)
else: else:
......
{ {
"DatabaseServer":"127.0.0.1:27017", "DatabaseServer":"127.0.0.1:27017",
"DiscoveryServer": "localhost:8400/asapo-discovery",
"PerformanceDbServer": "localhost:8086", "PerformanceDbServer": "localhost:8086",
"MonitorPerformance": true, "MonitorPerformance": true,
"MonitoringServerUrl":"auto", "MonitoringServerUrl":"auto",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment