From 17546d81e7d903d65b76d917ae704543e66409c1 Mon Sep 17 00:00:00 2001 From: karnem <mikhail.karnevskiy@desy.de> Date: Fri, 17 Nov 2023 14:04:57 +0100 Subject: [PATCH] Fix documentation --- docs/site/changelog/2023-11-21-23.11.0.md | 29 ++++++++++++++++ docs/site/docs/cookbook/simple-producer.mdx | 2 +- .../version-23.11.0/getting-started.mdx | 2 +- .../version-23.11.0/python/consume_dataset.py | 16 ++++----- .../version-23.11.0/python/pipeline.py | 33 +++++++++---------- .../version-23.11.0/python/produce_dataset.py | 31 ++++++++--------- 6 files changed, 67 insertions(+), 46 deletions(-) create mode 100644 docs/site/changelog/2023-11-21-23.11.0.md diff --git a/docs/site/changelog/2023-11-21-23.11.0.md b/docs/site/changelog/2023-11-21-23.11.0.md new file mode 100644 index 000000000..25aea5e35 --- /dev/null +++ b/docs/site/changelog/2023-11-21-23.11.0.md @@ -0,0 +1,29 @@ +--- +title: Version 23.11.0 +author: Mikhail Karnevskiy +author_title: DESY IT +tags: [release] +--- + +#Changelog for version 23.11.0 + +FEATURES +* Consumer API: `get_next` and `get_next_dataset` now have a flag `ordered` with default value `true` (for back compatibility). If flag is set to `false` function will deliver next available message. In case of `dataset` it will deliver messages ordered by arrival time of the first subset in the dataset. +* Consumer API: `get_stream_list` now have flag `detailed` which is `true` by default. In case of `false` it will not update an information about last timestamp, last message id and stream finish flag. This is done to speedup request in case of large number of streams. +* Consumer API: New function `get_source_list` returns list of all data-sources for given beamtime. +* Development tool: A new docker container, that provide standalone asapo service is created. Now asapo service can be lauched with a single command and require only docker installation. + +IMPROVEMENTS +* Speed-up function `get_stream_list`. This is achieved by caching list of streams in the collection of MongoDB. Timestamp of the earliest message is fixed now (previously was updated with each call of the function). +* Monitoring: Asapo monitoring provides detailed information for the last 12h only. Older information is given in compressed form. + +BUG FIXES +* Consumer API: If resend functionality is activated, incomplete datasets will re-sended. + +VERSION COMPATIBILITY +* Previous library should work. + +INTERNAL +* List of streams is stored in the MongoDB collection. This collection is populated, when get_stream_list is called. +* Messages are stored in the MongoDB with internal auto-incremented `_id`. User-given `id` is stored in `message_id`, indexed, and used to retrieve the data by `id`. +* Docker image `asapo-services-linux-build-env` is now can run fully-functional asapo service. This is used to launch integration tests during git-CI. diff --git a/docs/site/docs/cookbook/simple-producer.mdx b/docs/site/docs/cookbook/simple-producer.mdx index cf46baaf6..c098c9e24 100644 --- a/docs/site/docs/cookbook/simple-producer.mdx +++ b/docs/site/docs/cookbook/simple-producer.mdx @@ -83,7 +83,7 @@ Next we schedule the actual sending. This function call does not perform the act </TabItem> </Tabs> -The sending of the messages will probably be done in a loop. After all the data is sent, some additional actions might be done. You may want to wait for all the background requests to be finished before doing something else or exiting the application. +The sending of the messages will probably be done in a loop. After all the data is sent, some additional actions might be done. You may want to wait for all the background requests to be finished before doing something else or exiting the application. Do not delete producer object until sending is done. Otherwise sending process will be interrupted. <Tabs groupId="language" diff --git a/docs/site/versioned_docs/version-23.11.0/getting-started.mdx b/docs/site/versioned_docs/version-23.11.0/getting-started.mdx index 987aa6df6..b38af0305 100644 --- a/docs/site/versioned_docs/version-23.11.0/getting-started.mdx +++ b/docs/site/versioned_docs/version-23.11.0/getting-started.mdx @@ -33,7 +33,7 @@ LOCAL_LOGSFOLDER=~/asapo_logs # path for the local folder for the logs docker run --name asapo -v ${LOCAL_MONGODB}:/var/lib/mongodb/ \ -v ${LOCAL_INFLUXDB}:/var/lib/influxdb/ \ -v ${LOCAL_DATAFOLDER}:/tmp/asapo/receiver/files/test_facility/gpfs/test/2019/data/ \ - -v ${LOCAL_LOGSFOLDER}:/tmp/logs/ + -v ${LOCAL_LOGSFOLDER}:/tmp/logs/ \ --network host gitlab.desy.de:5555/asapo/asapo/asapo-standalone-dev:latest ``` diff --git a/docs/site/versioned_examples/version-23.11.0/python/consume_dataset.py b/docs/site/versioned_examples/version-23.11.0/python/consume_dataset.py index 208ecf633..df6d2e3a4 100644 --- a/docs/site/versioned_examples/version-23.11.0/python/consume_dataset.py +++ b/docs/site/versioned_examples/version-23.11.0/python/consume_dataset.py @@ -7,10 +7,10 @@ endpoint = "localhost:8400" beamtime = "asapo_test" token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" - "yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" - "1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" - "2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" - "dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" +"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" +"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" +"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" @@ -22,14 +22,14 @@ group_id = consumer.generate_group_id() try: # get_next_dataset behaves similarly to the regular get_next while True: - dataset = consumer.get_next_dataset(group_id, stream='dateset001') - print('Dataset Id:', dataset['id']) + dataset = consumer.get_next_dataset(group_id, stream = 'default') + print ('Dataset Id:', dataset['id']) # the initial response only contains the metadata # the actual content should be retrieved separately for metadata in dataset['content']: data = consumer.retrieve_data(metadata) - print('Part ' + str(metadata['dataset_substream']) + ' out of ' + str(dataset['expected_size'])) - print(data.tobytes().decode("utf-8"), metadata) + print ('Part ' + str(metadata['dataset_substream']) + ' out of ' + str(dataset['expected_size'])) + print (data.tobytes().decode("utf-8"), metadata) except asapo_consumer.AsapoStreamFinishedError: print('stream finished') diff --git a/docs/site/versioned_examples/version-23.11.0/python/pipeline.py b/docs/site/versioned_examples/version-23.11.0/python/pipeline.py index 0105aa03a..a9cb49637 100644 --- a/docs/site/versioned_examples/version-23.11.0/python/pipeline.py +++ b/docs/site/versioned_examples/version-23.11.0/python/pipeline.py @@ -1,24 +1,22 @@ import asapo_consumer import asapo_producer - -def callback(payload, err): +def callback(payload,err): if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): - print("could not send: ", payload, err) + print("could not send: ",payload,err) elif err is not None: - print("sent with warning: ", payload, err) + print("sent with warning: ",payload,err) else: - print("successfuly sent: ", payload) - + print("successfuly sent: ",payload) endpoint = "localhost:8400" beamtime = "asapo_test" token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" - "yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" - "1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" - "2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" - "dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" +"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" +"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" +"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" @@ -34,21 +32,20 @@ pipelined_stream_name = 'pipelined' try: while True: # we expect the message to be in the 'default' stream already - data, meta = consumer.get_next(group_id, meta_only=False) + data, meta = consumer.get_next(group_id, meta_only = False) message_id = meta['_id'] - + # work on our data text_data = data.tobytes().decode("utf-8") pipelined_message = (text_data + ' processed').encode() - + # you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase - producer.send(message_id, f"processed/test_file_{message_id}", pipelined_message, pipelined_stream_name, - callback=callback) - + producer.send(message_id, "processed/test_file_" + message_id, pipelined_message, pipelined_stream_name, callback = callback) + except asapo_consumer.AsapoStreamFinishedError: print('stream finished') - + except asapo_consumer.AsapoEndOfStreamError: print('stream ended') # pipeline snippet_end @@ -62,4 +59,4 @@ producer.send_stream_finished_flag("pipelined", last_id) # finish snippet_end # you can remove the source stream if you do not need it anymore -consumer.delete_stream(stream='default', error_on_not_exist=True) +consumer.delete_stream(stream = 'default', error_on_not_exist = True) diff --git a/docs/site/versioned_examples/version-23.11.0/python/produce_dataset.py b/docs/site/versioned_examples/version-23.11.0/python/produce_dataset.py index 70ec581f6..e82b71177 100644 --- a/docs/site/versioned_examples/version-23.11.0/python/produce_dataset.py +++ b/docs/site/versioned_examples/version-23.11.0/python/produce_dataset.py @@ -1,40 +1,35 @@ import asapo_producer - -def callback(payload, err): +def callback(payload,err): if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): - print("could not send: ", payload, err) + print("could not send: ",payload,err) elif err is not None: - print("sent with warning: ", payload, err) + print("sent with warning: ",payload,err) else: - print("successfuly sent: ", payload) - + print("successfuly sent: ",payload) endpoint = "localhost:8400" beamtime = "asapo_test" token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" - "yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" - "1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" - "2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" - "dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN" +"1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY" +"2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJ" +"dfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk") producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', token, 1, 60000) # dataset snippet_start -# assuming we have three different producers for a single dataset +#assuming we have three different producers for a single dataset # add the additional 'dataset' paremeter, which should be (<part_number>, <total_parts_in_dataset>) -producer.send(1, "processed/test_file_dataset_1", b"hello dataset 1", dataset=(1, 3), callback=callback, - stream="dateset001") +producer.send(1, "processed/test_file_dataset_1", b"hello dataset 1", dataset = (1,3), callback = callback) # this can be done from different producers in any order -producer.send(1, "processed/test_file_dataset_1", b"hello dataset 2", dataset=(2, 3), callback=callback, - stream="dateset001") -producer.send(1, "processed/test_file_dataset_1", b"hello dataset 3", dataset=(3, 3), callback=callback, - stream="dateset001") +producer.send(1, "processed/test_file_dataset_1", b"hello dataset 2", dataset = (2,3), callback = callback) +producer.send(1, "processed/test_file_dataset_1", b"hello dataset 3", dataset = (3,3), callback = callback) # dataset snippet_end producer.wait_requests_finished(2000) # the dataset parts are not counted towards the number of messages in the stream # the last message id in this example is still 1 -producer.send_stream_finished_flag("dateset001", 1) +producer.send_stream_finished_flag("default", 1) -- GitLab