diff --git a/docs/site/versioned_examples/version-23.11.0/python/pipeline.py b/docs/site/versioned_examples/version-23.11.0/python/pipeline.py index a9cb49637577101e8eaccbf7f5143f0451afee01..0105aa03a7f451045ced3b7eb06468085f368082 100644 --- a/docs/site/versioned_examples/version-23.11.0/python/pipeline.py +++ b/docs/site/versioned_examples/version-23.11.0/python/pipeline.py @@ -1,22 +1,24 @@ import asapo_consumer import asapo_producer -def callback(payload,err): + +def callback(payload, err): if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): - print("could not send: ",payload,err) + print("could not send: ", payload, err) elif err is not None: - print("sent with warning: ",payload,err) + print("sent with warning: ", payload, err) else: - print("successfuly sent: ",payload) + print("successfuly sent: ", payload) + endpoint = "localhost:8400" beamtime = "asapo_test" token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" -"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" -"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" -"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" -"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") + "yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" + "1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" + "2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" + "dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" @@ -32,20 +34,21 @@ pipelined_stream_name = 'pipelined' try: while True: # we expect the message to be in the 'default' stream already - data, meta = consumer.get_next(group_id, meta_only = False) + data, meta = consumer.get_next(group_id, meta_only=False) message_id = meta['_id'] - + # work on our data text_data = data.tobytes().decode("utf-8") pipelined_message = (text_data + ' processed').encode() - + # you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase - producer.send(message_id, "processed/test_file_" + message_id, pipelined_message, pipelined_stream_name, callback = callback) - + producer.send(message_id, f"processed/test_file_{message_id}", pipelined_message, pipelined_stream_name, + callback=callback) + except asapo_consumer.AsapoStreamFinishedError: print('stream finished') - + except asapo_consumer.AsapoEndOfStreamError: print('stream ended') # pipeline snippet_end @@ -59,4 +62,4 @@ producer.send_stream_finished_flag("pipelined", last_id) # finish snippet_end # you can remove the source stream if you do not need it anymore -consumer.delete_stream(stream = 'default', error_on_not_exist = True) +consumer.delete_stream(stream='default', error_on_not_exist=True)