Skip to content
Snippets Groups Projects
Commit 8e206ea1 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'feature/readable-args' into 'develop'

Use keywords instead of positional arguments:

See merge request asapo/asapo!213
parents 244b0816 6b76262f
No related branches found
No related tags found
No related merge requests found
import asapo_consumer
#create snippet_start
endpoint = "localhost:8400"
beamtime = "asapo_test"
# create snippet_start
# test token. In production it is created during the start of the beamtime
token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
# set it according to your configuration.
path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"
consumer = asapo_consumer \
.create_consumer(endpoint,
path_to_files,
True, # True if the path_to_files is accessible locally, False otherwise
beamtime, # Same as for the producer
"test_source", # Same as for the producer
token, # Access token
5000, # Timeout. How long do you want to wait on non-finished stream for a message.
'test_consumer_instance', # conumser instance id (can be 'auto')
'pipeline_step_1' # pipeline step id
)
#create snippet_end
#list snippet_start
token = str(
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk"
)
consumer = asapo_consumer.create_consumer(
server_name="localhost:8400",
source_path="auto",
has_filesystem=False, # Ignore for now
beamtime_id="asapo_test", # Same as for the producer
data_source="test_source", # Same as for the producer
token=token,
timeout_ms=5000, # How long do you want to wait on non-finished stream for a message.
instance_id="test_consumer_instance", # consumer instance id (can be 'auto')
pipeline_step="pipeline_step_1",
)
# create snippet_end
# list snippet_start
for stream in consumer.get_stream_list():
print("Stream name: ", stream['name'], "\n",
"LastId: ", stream['lastId'], "\n",
"Stream finished: ", stream['finished'], "\n",
"Next stream: ", stream['nextStream'])
#list snippet_end
#consume snippet_start
group_id = consumer.generate_group_id() # Several consumers can use the same group_id to process messages in parallel
print(
f"Stream name: {stream['name']}\n"
f"LastId: {stream['lastId']}\n"
f"Stream finished: {stream['finished']}\n"
f"Next stream: {stream['nextStream']}"
)
# list snippet_end
# consume snippet_start
group_id = (
consumer.generate_group_id()
) # Several consumers can use the same group_id to process messages in parallel
try:
# get_next is the main function to get messages from streams. You would normally call it in loop.
# you can either manually compare the meta['_id'] to the stream['lastId'], or wait for the exception to happen
while True:
data, meta = consumer.get_next(group_id, meta_only = False)
data, meta = consumer.get_next(group_id, meta_only=False)
print(data.tobytes().decode("utf-8"), meta)
except asapo_consumer.AsapoStreamFinishedError:
print('stream finished') # all the messages in the stream were processed
print("stream finished") # all the messages in the stream were processed
except asapo_consumer.AsapoEndOfStreamError:
print('stream ended') # not-finished stream timeout, or wrong or empty stream
#consume snippet_end
#delete snippet_start
consumer.delete_stream(error_on_not_exist = True) # you can delete the stream after consuming
#delete cnippet_end
print("stream ended") # not-finished stream timeout, or wrong or empty stream
# consume snippet_end
# delete snippet_start
consumer.delete_stream(
error_on_not_exist=True
) # you can delete the stream after consuming
# delete cnippet_end
import asapo_producer
# callback snippet_start
def callback(payload,err):
def callback(payload, err):
if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning):
# the data was not sent. Something is terribly wrong.
print("could not send: ",payload,err)
print("could not send: ", payload, err)
elif err is not None:
# The data was sent, but there was some unexpected problem, e.g. the file was overwritten.
print("sent with warning: ",payload,err)
print("sent with warning: ", payload, err)
else:
# all fine
print("successfuly sent: ",payload)
print("successfuly sent: ", payload)
# callback snippet_end
# create snippet_start
endpoint = "localhost:8400"
beamtime = "asapo_test"
token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
producer = asapo_producer \
.create_producer(endpoint,
'processed', # should be 'processed' or 'raw', 'processed' writes to the core FS
beamtime, # the folder should exist
'auto', # can be 'auto', if beamtime_id is given
'test_source', # source
token, # authorization token
1, # number of threads. Increase, if the sending speed seems slow
60000, # timeout. Do not change.
'test_producer_instance', # producer instance id (can be 'auto')
'pipeline_step_1' # pipeline step id
)
producer.set_log_level("error") # other values are "warning", "info" or "debug".
token = str(
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk"
)
producer = asapo_producer.create_producer(
endpoint="localhost:8400",
type="processed", # should be 'processed' or 'raw', 'processed' writes to the core FS
beamtime_id="asapo_test", # the folder should exist
beamline="auto", # can be 'auto', if beamtime_id is given
data_source="test_source",
token=token,
nthreads=1, # number of threads. Increase, if the sending speed seems slow
timeout_ms=60000,
instance_id="test_producer_instance", # producer instance id (can be 'auto')
pipeline_step="pipeline_step_1",
)
producer.set_log_level("error") # other values are "warning", "info" or "debug".
# create snippet_end
# send snippet_start
# we are sending a message with with index 1 to the default stream. Filename must start with processed/
producer.send(1, # message number. Should be unique and ordered.
"processed/test_file", # name of the file. Should be unique, or it will be overwritten
b"hello", # binary data
callback = callback) # callback
producer.send(
id=1, # message number. Should be unique and ordered.
exposed_path="processed/test_file", # name of the file. Should be unique, or it will be overwritten
data=b"hello", # binary data
callback=callback, # this is called after the asynchronous send operation finishes
)
# send snippet_end
# send data in loop
# add the following at the end of the script
# finish snippet_start
producer.wait_requests_finished(2000) # will synchronously wait for all the data to be sent.
# Use it when no more data is expected.
producer.wait_requests_finished(
timeout_ms=2000
) # will synchronously wait for all the data to be sent.
# Use it when no more data is expected.
# you may want to mark the stream as finished
producer.send_stream_finished_flag("default", # name of the stream. If you didn't specify the stream in 'send', it would be 'default'
1) # the number of the last message in the stream
producer.send_stream_finished_flag(
stream="default", # name of the stream. If you didn't specify the stream in 'send', it would be 'default'
last_id=1,
) # the number of the last message in the stream
# finish snippet_end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment