diff --git a/docs/site/examples/python/consume.py b/docs/site/examples/python/consume.py index e0133cebca993145ac8cf140920fab11c9d06c7e..6678e83a02111f193929687e2793c2ea142a489f 100644 --- a/docs/site/examples/python/consume.py +++ b/docs/site/examples/python/consume.py @@ -1,58 +1,59 @@ import asapo_consumer -#create snippet_start -endpoint = "localhost:8400" -beamtime = "asapo_test" - +# create snippet_start # test token. In production it is created during the start of the beamtime -token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" -"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" -"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" -"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" -"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") - -# set it according to your configuration. -path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" - - -consumer = asapo_consumer \ - .create_consumer(endpoint, - path_to_files, - True, # True if the path_to_files is accessible locally, False otherwise - beamtime, # Same as for the producer - "test_source", # Same as for the producer - token, # Access token - 5000, # Timeout. How long do you want to wait on non-finished stream for a message. - 'test_consumer_instance', # conumser instance id (can be 'auto') - 'pipeline_step_1' # pipeline step id - ) -#create snippet_end - -#list snippet_start +token = str( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" + "yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" + "1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" + "2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" + "dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk" +) + +consumer = asapo_consumer.create_consumer( + server_name="localhost:8400", + source_path="auto", + has_filesystem=False, # Ignore for now + beamtime_id="asapo_test", # Same as for the producer + data_source="test_source", # Same as for the producer + token=token, + timeout_ms=5000, # How long do you want to wait on non-finished stream for a message. + instance_id="test_consumer_instance", # consumer instance id (can be 'auto') + pipeline_step="pipeline_step_1", +) +# create snippet_end + +# list snippet_start for stream in consumer.get_stream_list(): - print("Stream name: ", stream['name'], "\n", - "LastId: ", stream['lastId'], "\n", - "Stream finished: ", stream['finished'], "\n", - "Next stream: ", stream['nextStream']) -#list snippet_end - -#consume snippet_start -group_id = consumer.generate_group_id() # Several consumers can use the same group_id to process messages in parallel + print( + f"Stream name: {stream['name']}\n" + f"LastId: {stream['lastId']}\n" + f"Stream finished: {stream['finished']}\n" + f"Next stream: {stream['nextStream']}" + ) +# list snippet_end + +# consume snippet_start +group_id = ( + consumer.generate_group_id() +) # Several consumers can use the same group_id to process messages in parallel try: # get_next is the main function to get messages from streams. You would normally call it in loop. # you can either manually compare the meta['_id'] to the stream['lastId'], or wait for the exception to happen while True: - data, meta = consumer.get_next(group_id, meta_only = False) + data, meta = consumer.get_next(group_id, meta_only=False) print(data.tobytes().decode("utf-8"), meta) except asapo_consumer.AsapoStreamFinishedError: - print('stream finished') # all the messages in the stream were processed + print("stream finished") # all the messages in the stream were processed except asapo_consumer.AsapoEndOfStreamError: - print('stream ended') # not-finished stream timeout, or wrong or empty stream -#consume snippet_end - -#delete snippet_start -consumer.delete_stream(error_on_not_exist = True) # you can delete the stream after consuming -#delete cnippet_end + print("stream ended") # not-finished stream timeout, or wrong or empty stream +# consume snippet_end + +# delete snippet_start +consumer.delete_stream( + error_on_not_exist=True +) # you can delete the stream after consuming +# delete cnippet_end diff --git a/docs/site/examples/python/produce.py b/docs/site/examples/python/produce.py index 9a5303ffae093fe8b274a09533bce5af5c1d19f5..c7506ad5f4340b162bee1409044904b461802598 100644 --- a/docs/site/examples/python/produce.py +++ b/docs/site/examples/python/produce.py @@ -1,60 +1,68 @@ import asapo_producer + # callback snippet_start -def callback(payload,err): +def callback(payload, err): if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): # the data was not sent. Something is terribly wrong. - print("could not send: ",payload,err) + print("could not send: ", payload, err) elif err is not None: # The data was sent, but there was some unexpected problem, e.g. the file was overwritten. - print("sent with warning: ",payload,err) + print("sent with warning: ", payload, err) else: # all fine - print("successfuly sent: ",payload) + print("successfuly sent: ", payload) + + # callback snippet_end # create snippet_start -endpoint = "localhost:8400" -beamtime = "asapo_test" - -token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" -"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" -"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" -"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" -"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") - -producer = asapo_producer \ - .create_producer(endpoint, - 'processed', # should be 'processed' or 'raw', 'processed' writes to the core FS - beamtime, # the folder should exist - 'auto', # can be 'auto', if beamtime_id is given - 'test_source', # source - token, # authorization token - 1, # number of threads. Increase, if the sending speed seems slow - 60000, # timeout. Do not change. - 'test_producer_instance', # producer instance id (can be 'auto') - 'pipeline_step_1' # pipeline step id - ) - -producer.set_log_level("error") # other values are "warning", "info" or "debug". +token = str( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" + "yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" + "1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" + "2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" + "dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk" +) + +producer = asapo_producer.create_producer( + endpoint="localhost:8400", + type="processed", # should be 'processed' or 'raw', 'processed' writes to the core FS + beamtime_id="asapo_test", # the folder should exist + beamline="auto", # can be 'auto', if beamtime_id is given + data_source="test_source", + token=token, + nthreads=1, # number of threads. Increase, if the sending speed seems slow + timeout_ms=60000, + instance_id="test_producer_instance", # producer instance id (can be 'auto') + pipeline_step="pipeline_step_1", +) + +producer.set_log_level("error") # other values are "warning", "info" or "debug". # create snippet_end # send snippet_start # we are sending a message with with index 1 to the default stream. Filename must start with processed/ -producer.send(1, # message number. Should be unique and ordered. - "processed/test_file", # name of the file. Should be unique, or it will be overwritten - b"hello", # binary data - callback = callback) # callback +producer.send( + id=1, # message number. Should be unique and ordered. + exposed_path="processed/test_file", # name of the file. Should be unique, or it will be overwritten + data=b"hello", # binary data + callback=callback, # this is called after the asynchronous send operation finishes +) # send snippet_end # send data in loop # add the following at the end of the script # finish snippet_start -producer.wait_requests_finished(2000) # will synchronously wait for all the data to be sent. - # Use it when no more data is expected. +producer.wait_requests_finished( + timeout_ms=2000 +) # will synchronously wait for all the data to be sent. +# Use it when no more data is expected. # you may want to mark the stream as finished -producer.send_stream_finished_flag("default", # name of the stream. If you didn't specify the stream in 'send', it would be 'default' - 1) # the number of the last message in the stream +producer.send_stream_finished_flag( + stream="default", # name of the stream. If you didn't specify the stream in 'send', it would be 'default' + last_id=1, +) # the number of the last message in the stream # finish snippet_end