Skip to content
Snippets Groups Projects
Commit 3e202e2a authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Fix but with formatting of int to string. Improve formatting.

parent 98ee0469
No related branches found
No related tags found
No related merge requests found
import asapo_consumer
import asapo_producer
def callback(payload,err):
def callback(payload, err):
if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning):
print("could not send: ",payload,err)
print("could not send: ", payload, err)
elif err is not None:
print("sent with warning: ",payload,err)
print("sent with warning: ", payload, err)
else:
print("successfuly sent: ",payload)
print("successfuly sent: ", payload)
endpoint = "localhost:8400"
beamtime = "asapo_test"
token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"
......@@ -32,20 +34,21 @@ pipelined_stream_name = 'pipelined'
try:
while True:
# we expect the message to be in the 'default' stream already
data, meta = consumer.get_next(group_id, meta_only = False)
data, meta = consumer.get_next(group_id, meta_only=False)
message_id = meta['_id']
# work on our data
text_data = data.tobytes().decode("utf-8")
pipelined_message = (text_data + ' processed').encode()
# you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase
producer.send(message_id, "processed/test_file_" + message_id, pipelined_message, pipelined_stream_name, callback = callback)
producer.send(message_id, f"processed/test_file_{message_id}", pipelined_message, pipelined_stream_name,
callback=callback)
except asapo_consumer.AsapoStreamFinishedError:
print('stream finished')
except asapo_consumer.AsapoEndOfStreamError:
print('stream ended')
# pipeline snippet_end
......@@ -59,4 +62,4 @@ producer.send_stream_finished_flag("pipelined", last_id)
# finish snippet_end
# you can remove the source stream if you do not need it anymore
consumer.delete_stream(stream = 'default', error_on_not_exist = True)
consumer.delete_stream(stream='default', error_on_not_exist=True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment