Skip to content
Snippets Groups Projects
Commit 74340c4a authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Give stream name explicitly. Improve formatting.

parent 0c4adfb0
No related branches found
No related tags found
No related merge requests found
......@@ -7,10 +7,10 @@ endpoint = "localhost:8400"
beamtime = "asapo_test"
token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"
......@@ -22,14 +22,14 @@ group_id = consumer.generate_group_id()
try:
# get_next_dataset behaves similarly to the regular get_next
while True:
dataset = consumer.get_next_dataset(group_id, stream = 'default')
print ('Dataset Id:', dataset['id'])
dataset = consumer.get_next_dataset(group_id, stream='dateset001')
print('Dataset Id:', dataset['id'])
# the initial response only contains the metadata
# the actual content should be retrieved separately
for metadata in dataset['content']:
data = consumer.retrieve_data(metadata)
print ('Part ' + str(metadata['dataset_substream']) + ' out of ' + str(dataset['expected_size']))
print (data.tobytes().decode("utf-8"), metadata)
print('Part ' + str(metadata['dataset_substream']) + ' out of ' + str(dataset['expected_size']))
print(data.tobytes().decode("utf-8"), metadata)
except asapo_consumer.AsapoStreamFinishedError:
print('stream finished')
......
import asapo_producer
def callback(payload,err):
def callback(payload, err):
if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning):
print("could not send: ",payload,err)
print("could not send: ", payload, err)
elif err is not None:
print("sent with warning: ",payload,err)
print("sent with warning: ", payload, err)
else:
print("successfuly sent: ",payload)
print("successfuly sent: ", payload)
endpoint = "localhost:8400"
beamtime = "asapo_test"
token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN"
"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY"
"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ"
"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk")
producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', token, 1, 60000)
# dataset snippet_start
#assuming we have three different producers for a single dataset
# assuming we have three different producers for a single dataset
# add the additional 'dataset' paremeter, which should be (<part_number>, <total_parts_in_dataset>)
producer.send(1, "processed/test_file_dataset_1", b"hello dataset 1", dataset = (1,3), callback = callback)
producer.send(1, "processed/test_file_dataset_1", b"hello dataset 1", dataset=(1, 3), callback=callback,
stream="dateset001")
# this can be done from different producers in any order
producer.send(1, "processed/test_file_dataset_1", b"hello dataset 2", dataset = (2,3), callback = callback)
producer.send(1, "processed/test_file_dataset_1", b"hello dataset 3", dataset = (3,3), callback = callback)
producer.send(1, "processed/test_file_dataset_1", b"hello dataset 2", dataset=(2, 3), callback=callback,
stream="dateset001")
producer.send(1, "processed/test_file_dataset_1", b"hello dataset 3", dataset=(3, 3), callback=callback,
stream="dateset001")
# dataset snippet_end
producer.wait_requests_finished(2000)
# the dataset parts are not counted towards the number of messages in the stream
# the last message id in this example is still 1
producer.send_stream_finished_flag("default", 1)
producer.send_stream_finished_flag("dateset001", 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment