diff --git a/docs/site/CMakeLists.txt b/docs/site/CMakeLists.txt index 7f16530b4a7da2043b369fab818e4ea5f9d025b8..ccaa1cf11f0e590b8a36e6bbc81623441b919e7f 100644 --- a/docs/site/CMakeLists.txt +++ b/docs/site/CMakeLists.txt @@ -1,21 +1,18 @@ -configure_files(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} true) -configure_files(${CMAKE_CURRENT_SOURCE_DIR}/blog ${CMAKE_CURRENT_BINARY_DIR}/blog) -configure_files(${CMAKE_CURRENT_SOURCE_DIR}/docs ${CMAKE_CURRENT_BINARY_DIR}/docs) file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/static DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/versioned_docs DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/versioned_sidebars DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) - file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/src DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/plugins DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) - - file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/examples DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) -configure_files(${CMAKE_CURRENT_SOURCE_DIR}/examples/getting_started ${CMAKE_CURRENT_BINARY_DIR}/examples/getting_started) - - add_custom_target( site ALL COMMAND npm run build VERBATIM ) + +add_custom_target( + freeze_version + COMMAND + ./freeze_version.sh +) diff --git a/docs/site/blog/2021-03-28-hi.md b/docs/site/blog/2021-03-28-hi.md deleted file mode 100644 index 0fa0af868afe0255cecd2f509f9007694cdcdea5..0000000000000000000000000000000000000000 --- a/docs/site/blog/2021-03-28-hi.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -slug: Hi -title: Hello -author: Sergey Yakubov -author_title: DESY IT -<!-- author_url: https://github.com/wgao19 --> -<!-- author_image_url: https://avatars1.githubusercontent.com/u/2055384?v=4 --> -tags: [asapo] ---- - -Welcome to the new ASAP::O Project site \ No newline at end of file diff --git a/docs/site/changelog/2021-02-09-20.12.0.md b/docs/site/changelog/2021-02-09-20.12.0.md new file mode 100644 index 0000000000000000000000000000000000000000..26a4076c994aec547918e7c179434510d801ae30 --- /dev/null +++ b/docs/site/changelog/2021-02-09-20.12.0.md @@ -0,0 +1,43 @@ +--- +title: Version 20.12.0 +author: Sergey Yakubov +author_title: DESY IT +tags: [release] +--- + +#Changelog for version 20.12.0 + +FEATURES +* implemented possibility to send data without writing to database (no need of consecutive indexes, etc. but will not be able to consume such data) +* allow to return incomplete datasets (wihout error if one sets minimum dataset size, otherwise with "partial data" error) + + IMPROVEMENTS +* Consumer API - change behavior of GetLast/get_last - do not change current pointer after call +* Consumer API - add interrupt_current_operation to allow interrupting (from a separate thread) long consumer operation +* Producer API - return original data in callback payload. +* Producer API - allow to set queue limits (number of pending requests and/or max memory), reject new requests if reached the limits +* building rpm, deb and exe packages for client libs + +BREAKING CHANGES +* Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple (id,content) as before +* Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions +* Producer API - changed meaning of subsets (subset_id replaced with dataset_substream and this means now id of the image within a subset (e.g. module number for multi-module detector)), message_id is now a global id of a multi-set data (i.g. multi-image id) + #### renaming - general +* stream -> data_source, substream -> stream +* use millisecond everywhere for timeout/delay +* use term `message` for blob of information we send around, rename related structs, parameters, ... +* C++ - get rid of duplicate functions with default stream + #### renaming - Producer API +* SendData/send_data -> Send/send +* SendXX/send_xx -> swap parameters (stream to the end) +* id_in_subset -> dataset_substream +* subset_size -> dataset_size (and in general replace subset with dataset) + #### renaming - Consumer API +* broker -> consumer +* SetLastReadMarker/set_lastread_marker -> swap arguments +* GetUnacknowledgedTupleIds/get_unacknowledged_tuple_ids -> GetUnacknowledgedMessages/get_unacknowledged_messages +* GetLastAcknowledgedTulpeId/get_last_acknowledged_tuple_id -> GetLastAcknowledgedMessage/get_last_acknowledged_message +* GetUnacknowledgedMessages, -> swap parameters (stream to the end) + +BUG FIXES +* fix memory leak bug in Python consumer library (lead to problems when creating many consumer instances) diff --git a/docs/site/changelog/2021-03-27-21.03.0.md b/docs/site/changelog/2021-03-27-21.03.0.md new file mode 100644 index 0000000000000000000000000000000000000000..afb015b0851e0bacab4b0960b0ecef70438d6392 --- /dev/null +++ b/docs/site/changelog/2021-03-27-21.03.0.md @@ -0,0 +1,20 @@ +--- +title: Version 21.03.0 +author: Sergey Yakubov +author_title: DESY IT +tags: [release] +--- + +#Changelog for version 21.03.0 + + IMPROVEMENTS +* Producer API - queue limits in Python, for C++ return original data in error custom data +* Consumer API - add GetCurrentDatasetCount/get_current_dataset_count function with option to include or exclude incomplete datasets +* Consumer API - GetStreamList/get_stream_list - can filter finished/unfinished streams now +* Producer/Consumer API - StreamInfo structure/Python dictionary include more information (is stream finished or not, ...) +* Switch to JWT tokens (token has more symbols, expiration time, can be revoked and there are two type of tokens - with read/write access rights) +* Improved versioning. Producer/Consumer API - introduce GetVersionInfo/get_version_info, compatiblity check between clients and server + +BREAKING CHANGES +* Consumer API (C++ only)- GetStreamList has now extra argument StreamFilter +* Consumer/Producer libraries need to be updated due to protocol changes diff --git a/docs/site/changelog/2021-04-28-21.03.1.md b/docs/site/changelog/2021-04-28-21.03.1.md new file mode 100644 index 0000000000000000000000000000000000000000..f1e4bca1a2d9458bed61e591bce3fac172e4fb61 --- /dev/null +++ b/docs/site/changelog/2021-04-28-21.03.1.md @@ -0,0 +1,12 @@ +--- +title: Version 21.03.1 +author: Sergey Yakubov +author_title: DESY IT +tags: [release] +--- + +#Changelog for version 21.03.1 + +BUG FIXES +* Core services: fix LDAP authorization for raw data type Producers + diff --git a/docs/site/changelog/2021-05-10-21.03.2.md b/docs/site/changelog/2021-05-10-21.03.2.md new file mode 100644 index 0000000000000000000000000000000000000000..a6388c535d56e9638b4679ecf30f745901393b8d --- /dev/null +++ b/docs/site/changelog/2021-05-10-21.03.2.md @@ -0,0 +1,19 @@ +--- +title: Version 21.03.2 +author: Sergey Yakubov +author_title: DESY IT +tags: [release] +--- + +#Changelog for version 21.03.2 + +FEATURES +* implemented possibility to delete stream (only metadata, not files yet) + +IMPROVEMENTS +* Consumer API - retry file delivery/reading with timeout (can be useful for the case file arrives after was metadta ingested, e.g. for slow NFS transfer,...) + +BUG FIXES +* Consumer API: fix race condition in GetStreamList/get_stream_list +* Producer API: fix segfault in send_stream_finished_flag +* Producer API: fix deadlock in producer timeout diff --git a/docs/site/changelog/2021-05-18-21.03.3.md b/docs/site/changelog/2021-05-18-21.03.3.md new file mode 100644 index 0000000000000000000000000000000000000000..b2d3aace44700c9b54706fc1e8eb2470457b10bb --- /dev/null +++ b/docs/site/changelog/2021-05-18-21.03.3.md @@ -0,0 +1,12 @@ +--- +title: Version 21.03.3 +author: Sergey Yakubov +author_title: DESY IT +tags: [release] +--- + +#Changelog for version 21.03.3 + +BUG FIXES +* Consumer API: fix return error type when sending acknowledgement second time +* Producer API: fix GetStreamInfo/stream_info and GetLastStream/last_stream for datasets diff --git a/docs/site/changelog/2021-07-07-21.06.0.md b/docs/site/changelog/2021-07-07-21.06.0.md new file mode 100644 index 0000000000000000000000000000000000000000..56f0c45c86ca1cf937de702b8c719f4490f56dd0 --- /dev/null +++ b/docs/site/changelog/2021-07-07-21.06.0.md @@ -0,0 +1,20 @@ +--- +title: Version 21.06.0 +author: Sergey Yakubov +author_title: DESY IT +tags: [release] +--- + +FEATURES +* Consumer API: C client +* Producer API: An option to automatically generate message id (use sparingly, reduced performance possible) + +IMPROVEMENTS +* Consumer/Producer API - allow any characters in source/stream/group names +* Consumer/Producer API - introduce stream metadata +* Consumer API - an option to auto discovery of data folder when consumer client uses file transfer service (has_filesystem=False) +* Improved build procedure - shared libaries, added pkg-config and cmake config for asapo clients + +BUG FIXES +* Consumer API: multiple consumers from same group receive stream finished error +* Consumer API: return ServiceUnavailable instead of Unauthorized in case an authorization service is unreachable diff --git a/docs/site/docs/compare-to-others.md b/docs/site/docs/compare-to-others.md index d8bf7df7292e35c13cd32a13f63525890f252c1e..97e0bd7fc30ae8166e3493742164a3bb6044fc31 100644 --- a/docs/site/docs/compare-to-others.md +++ b/docs/site/docs/compare-to-others.md @@ -46,4 +46,4 @@ In the table below we compare the approaches from different points of view and i | need to change user code | No | Yes | Yes | Yes | | parallelisation | Yes (if user software allows that) | Not out of the box | Not out of the box | Yes | | legacy applications | Yes | No (wrapper could be a workaround) | No (wrapper could be a workaround) | No (wrapper could be a workaround) | -| transparent restart/continuation of simulations in case e.g. worker process crashes, also for parallel simulations | Not out of the box | Yes | No | Yes | \ No newline at end of file +| transparent restart/continuation of simulations in case e.g. worker process crashes, also for parallel simulations | Not out of the box | Yes | No | Yes | diff --git a/docs/site/docs/cookbook/acknowledgements.mdx b/docs/site/docs/cookbook/acknowledgements.mdx new file mode 100644 index 0000000000000000000000000000000000000000..96b70ad30cb657b04dd2d55844cc8a4e3a1e5b51 --- /dev/null +++ b/docs/site/docs/cookbook/acknowledgements.mdx @@ -0,0 +1,38 @@ +--- +title: Acknowledgements +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +While consuming the messages we could issue acknowledgements, to denote that the messages were (or were not) processed successfully. + +The following example produces 10 sample messages, and then acknowledges all of them, except for the messages 3, 5 and 7. The message #3 receives a negative acknowledgement, which puts is back in the stream for the repeated processing. On the second attempt the message #3 gets acknowledged. The messages 5 and 7 remain unacknowledged. + +Only the acknowledgements-relevant parts would be explained here. Look at the corresponding examples to learn about producers and consumers in detailes. + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/acknowledgements.py" +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/acknowledgements.cpp" +``` + +</TabItem> +</Tabs> + +The output will show the order in which the messages receive their acknowledgements. You may notice that the second acknowledgement of the message #3 happens with a delay, which was deliberatly chosen. The unacknowledged messages are retrieved separately at the end, after the consumer timeout. diff --git a/docs/site/docs/cookbook/datasets.mdx b/docs/site/docs/cookbook/datasets.mdx new file mode 100644 index 0000000000000000000000000000000000000000..0c8cf6564169ddc6873e4d925d648181d24aafdb --- /dev/null +++ b/docs/site/docs/cookbook/datasets.mdx @@ -0,0 +1,66 @@ +--- +title: Datasets +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +The messages in the stream can be multi-parted. If you have several producers (e.g. sub-detectors) that produces several parts of the single message, you can use datasets to assemble a single message from several parts. + +Only the dataset-relevant parts would be explained here. Look at the corresponding examples to learn about producers and consumers in detailes. + +## Dataset Producer + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/produce_dataset.py" +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/produce_dataset.cpp" +``` + +</TabItem> +</Tabs> + +You should see the "successfuly sent" message in the logs, and the file should appear in the corresponding folder (by default in ```/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test```) + +## Dataset Consumer + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/consume_dataset.py" +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/consume_dataset.cpp" +``` + +</TabItem> +</Tabs> + + +The details about the received dataset should appear in the logs, together with the message "stream finished" (if the "finished" flag was sent for the stream). The "stream ended" message will appear for non-finished streams, but may also mean that the stream does not exist (or was deleted). diff --git a/docs/site/docs/cookbook/metadata.mdx b/docs/site/docs/cookbook/metadata.mdx new file mode 100644 index 0000000000000000000000000000000000000000..c88c297779075baa256bfe8954befee43513e691 --- /dev/null +++ b/docs/site/docs/cookbook/metadata.mdx @@ -0,0 +1,38 @@ +--- +title: Metadata +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +You can also store any custom metadata with your beamtime, stream, and each message. This tutorial shows you how you can store, update and access this metadata. The metadata is stored in JSON, and any JSON structure is supported. + +Only the metadata-relevant parts would be explained here. Look at the corresponding examples to learn about producers and consumers in detailes. + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/metadata.py" +``` + +</TabItem> + +<TabItem value="cpp"> + +Since C++ doesn't have a built-in JSON support, you'd have to use 3rd party libs if you want JSON parsing. In this tutorial we won't use any JSON parsing, and will treat JSONs as regular strings. Please note, that ASAP::O only supports valid JSONs, and providing invalid input will result in error. + +```cpp content="./examples/cpp/metadata.cpp" +``` + +</TabItem> +</Tabs> + +The output will show the metadata retrieved from the beamtime, stream and message. diff --git a/docs/site/docs/cookbook/next_stream.mdx b/docs/site/docs/cookbook/next_stream.mdx new file mode 100644 index 0000000000000000000000000000000000000000..bec38c88bea9a850fa6cd609934c62978dc6eedf --- /dev/null +++ b/docs/site/docs/cookbook/next_stream.mdx @@ -0,0 +1,36 @@ +--- +title: Stream Finishing +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +When all the data in the stream is sent, the stream can be finished, and it is posiible to set the "next stream" to follow up the first. In this tutorial it'll be shown how several streams can be chained together in single consumer by using the stream finishing. + +Only the stream chaining-relevant parts would be explained here. Look at the corresponding examples to learn about producers and consumers in detailes. + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/next_stream.py" +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/next_stream.cpp" +``` + +</TabItem> +</Tabs> + +The output will show the messages being consumed from the streams in order. First, the ```default``` stream, then the ```next```. diff --git a/docs/site/docs/cookbook/overview.md b/docs/site/docs/cookbook/overview.md new file mode 100644 index 0000000000000000000000000000000000000000..97db96a408a253119c2091af786a8a3bd3820726 --- /dev/null +++ b/docs/site/docs/cookbook/overview.md @@ -0,0 +1,13 @@ +--- +title: Code Examples Overview +--- + +Here you can find the code examples for various common asapo usecases. Make sure that the ASAP::O instance and client libraries are properly installed, see [Getting Started page](../) for details. + +For the most basic usecase, see the [Simple Producer](simple-producer) and [Simple Consumer](simple-consumer). There are also the basic examples of CMake and makefile configurations for client compilation. + +The API documentation can be found [here](http://asapo.desy.de/cpp) (for C++) or [here](http://asapo.desy.de/python) (for python). + +:::tip +You can see more examples in ASAPO [source code](https://stash.desy.de/projects/ASAPO/repos/asapo/browse/examples) +::: diff --git a/docs/site/docs/cookbook/query.mdx b/docs/site/docs/cookbook/query.mdx new file mode 100644 index 0000000000000000000000000000000000000000..b1d5931938da81ce9a09affa4981d99b1115e593 --- /dev/null +++ b/docs/site/docs/cookbook/query.mdx @@ -0,0 +1,36 @@ +--- +title: Message query +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +Messages in streams can be retrieved based on their metadata. Both the technical information (e.g. ID or timestamp) and the user metadata (see [this tutorial](metadata) for details) can be used to make a query. In this tutorial several examples of the queries are shown. The standard SQL sysntaxis is used. + +Only the query-relevant parts would be explained here. Look at the corresponding examples to learn about producers and consumers in detailes. + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/query.py" +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/query.cpp" +``` + +</TabItem> +</Tabs> + +The output will show the message selection together with the conditions used for selection. diff --git a/docs/site/docs/cookbook/simple-consumer.mdx b/docs/site/docs/cookbook/simple-consumer.mdx new file mode 100644 index 0000000000000000000000000000000000000000..41577dfd892456f9ed7f2db0550f13de490246ab --- /dev/null +++ b/docs/site/docs/cookbook/simple-consumer.mdx @@ -0,0 +1,72 @@ +--- +title: Simple Consumer +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +This example shows how to consume a message. It also shows how to organize the simple loop or extract metadata. + +A special access token is needed to create a consumer. For the purpose of this tutorial a special "test" token is used. It will only work for the beamtime called "asapo_test". + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + { label: 'C', value: 'c', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/consume.py" +``` + +Execute it with python3 + +``` +$ python3 consumer.py +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/consume.cpp" +``` + +Compile e.g. using CMake and execute. You might need to point cmake (with CMAKE_PREFIX_PATH) to asapo installation and curl library if installed to non-standard location. + +```shell content="./examples/cpp/CMakeLists.txt" snippetTag="#consumer" +``` + +``` +$ cmake . && make +$ ./asapo-consume +``` + +</TabItem> + +<TabItem value="c"> + +```c content="./examples/c/consume.c" +``` + +Compile e.g. using Makefile and pkg-config (although we recommend CMake - see C++ section) and execute. This example assumes asapo is installed to /opt/asapo. Adjust correspondingly. + +```makefile content="./examples/c/Makefile" snippetTag="#consumer" +``` + +``` +$ make +$ ./asapo-consume +``` + + +</TabItem> + +</Tabs> + +The details about the received message should appear in the logs, together with the message "stream finished" (if the "finished" flag was sent for the stream). The "stream ended" message will appear for non-finished streams, but may also mean that the stream does not exist (or was deleted). diff --git a/docs/site/docs/cookbook/simple-pipeline.mdx b/docs/site/docs/cookbook/simple-pipeline.mdx new file mode 100644 index 0000000000000000000000000000000000000000..eb4ef682315fd87ba8433022516e445a5347f5f9 --- /dev/null +++ b/docs/site/docs/cookbook/simple-pipeline.mdx @@ -0,0 +1,34 @@ +--- +title: Simple Pipeline +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +The consumer and a producer could be combined together in order to create pipelines. Look at the corresponding examples to learn about producers and consumers in detailes. Only the pipeline-related things will be explained here. + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/pipeline.py" +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/pipeline.cpp" +``` + +</TabItem> +</Tabs> + +The details about the received message should appear in the logs, together with the message "stream finished" (if the "finished" flag was sent for the stream). The "stream ended" message will appear for non-finished streams, but may also mean that the stream does not exist (or was deleted). The processed file should appear in the corresponding folder (by default in ```/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test```) diff --git a/docs/site/docs/cookbook/simple-producer.mdx b/docs/site/docs/cookbook/simple-producer.mdx new file mode 100644 index 0000000000000000000000000000000000000000..e31f65816cb4258568147d0902120d60baa91739 --- /dev/null +++ b/docs/site/docs/cookbook/simple-producer.mdx @@ -0,0 +1,50 @@ +--- +title: Simple Producer +--- + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +This example produces a simple message. Use this, if you don't need any special beam/message metadata. + +<Tabs + groupId="language" + defaultValue="python" + values={[ + { label: 'Python', value: 'python', }, + { label: 'C++', value: 'cpp', }, + ] +}> +<TabItem value="python"> + +```python content="./examples/python/produce.py" +``` + +Execute it with python3 + +```shell +$ python3 produce.py +``` + +</TabItem> + +<TabItem value="cpp"> + +```cpp content="./examples/cpp/produce.cpp" +``` + +Compile e.g. using CMake and execute. You might need to point cmake (with CMAKE_PREFIX_PATH) to asapo installation and curl library if installed to non-standard location. + +```shell content="./examples/cpp/CMakeLists.txt" snippetTag="#producer" +``` + +```shell +$ cmake . && make +$ ./asapo-produce +``` + +</TabItem> +</Tabs> + +You should see the "successfuly sent" message in the logs, and the file should appear in the corresponding folder (by default in ```/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test```) diff --git a/docs/site/docs/getting-started.mdx b/docs/site/docs/getting-started.mdx index 05d757e34097a6625cfcb5a2f4ac0d2c956edebd..726f6f98226b7b65161bf2327a0bf8d4e2af1445 100644 --- a/docs/site/docs/getting-started.mdx +++ b/docs/site/docs/getting-started.mdx @@ -7,9 +7,9 @@ slug: / import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -## Step 1: Start ASAPO services {#step-1} +## Start ASAPO services {#step-1} -If you already have running ASAPO services and know the endpoint, you can skip to the [step 2](#step-2). +If you already have running ASAPO services and know the endpoint, you don't need this, and can go to [Client Libraries](#step-2). Otherwise, for testing purposes one can start ASAPO services in a standalone mode (this is not recommended for production deployment). @@ -17,7 +17,7 @@ Otherwise, for testing purposes one can start ASAPO services in a standalone mod The easiest way is to use a Docker container. So, make sure Docker is installed and you have necessary permissions to use it. Please note that this will only work on a Linux machine. Also please note that ASAPO needs some ports to be available. You can check the list -[here](https://stash.desy.de/projects/ASAPO/repos/asapo/browse/deploy/asapo_services/scripts/asapo.auto.tfvars.in?at=@ASAPO_VERSION_IN_DOCS@#37). +[here](https://stash.desy.de/projects/ASAPO/repos/asapo/browse/deploy/asapo_services/scripts/asapo.auto.tfvars.in#37). Now, depending on how your Docker daemon is configured (if it uses a unix socket or a tcp port for communications) @@ -32,14 +32,14 @@ unix socket or a tcp port for communications) }> <TabItem value="unix"> -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/start_asapo_socket.sh" +```shell content=./examples/start_asapo_socket.sh" ``` </TabItem> <TabItem value="tcp"> -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/start_asapo_tcp.sh" +```shell content="./examples/start_asapo_tcp.sh" ``` </TabItem> @@ -70,7 +70,16 @@ We have a running instance for processing data collected during experiments. Ple ::: -## Step 2: Install client libraries {#step-2} +### Services shutdown + +After you've done with your instance of ASAPO, you might want to gracefully shutdown the running services. If you don't do it, your machine will become bloated with the unused docker images. + +```shell content="./examples/cleanup.sh" +``` + +<br/><br/> + +## Install client libraries {#step-2} Now you can install Python packages or C++ libraries for ASAPO Producer and Consumer API (you need to be in DESY intranet to access files). @@ -84,161 +93,25 @@ Now you can install Python packages or C++ libraries for ASAPO Producer and Cons }> <TabItem value="python-pip"> -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/install_python_clients_pip.sh" snippetTag="#snippet1" +```shell content="./examples/install_python_clients_pip.sh" snippetTag="#snippet1" ``` </TabItem> <TabItem value="python-packages"> -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/install_python_clients_pkg.sh" -``` - -</TabItem> -<TabItem value="cpp"> - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/install_cpp_clients.sh" -``` - -</TabItem> -</Tabs> - -## Step 3: Produce a message {#step-3} - -<Tabs - groupId="language" - defaultValue="python" - values={[ - { label: 'Python', value: 'python', }, - { label: 'C++', value: 'cpp', }, - ] -}> -<TabItem value="python"> - -Now you can write a Producer client (API documentation [here](http://asapo.desy.de/python/producer.html)). - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/python/produce.py" -``` - -Execute it with python3 - -``` -$ python3 produce.py -``` - -</TabItem> - -<TabItem value="cpp"> - -Now you can write a Producer client (API documentation [here](http://asapo.desy.de/cpp/producer)). - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/cpp/produce.cpp" -``` - -Compile e.g. using CMake and execute. You might need to point cmake (with CMAKE_PREFIX_PATH) to asapo installation and curl library if installed to non-standard location. - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/cpp/CMakeLists.txt" snippetTag="#producer" -``` - -``` -$ cmake . && make -$ ./asapo-produce -``` - - -</TabItem> -</Tabs> - -the output should look like - -``` -{"time":"***** *****","source":"producer_api","level":"info","message":"authorized connection to receiver at ****:****"} -successfuly sent: {"id": 1, "buffer": "test_file"} -``` - - -## Step 4: Consume a message {#step-4} - -A consumer data that reads the message ingested during step 3. Note that a token is needed to work with data. In production, the token is provided during start of the beamtime. - -<Tabs - groupId="language" - defaultValue="python" - values={[ - { label: 'Python', value: 'python', }, - { label: 'C++', value: 'cpp', }, - { label: 'C', value: 'c', }, - ] -}> -<TabItem value="python"> - -Complete API documentation [here](http://asapo.desy.de/python/consumer.html) - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/python/consume.py" -``` - -Execute it with python3 - -``` -$ python3 consumer.py +```shell content="./examples/install_python_clients_pkg.sh" ``` </TabItem> - <TabItem value="cpp"> -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/cpp/consume.cpp" -``` - -Compile e.g. using CMake and execute. You might need to point cmake (with CMAKE_PREFIX_PATH) to asapo installation and curl library if installed to non-standard location. - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/cpp/CMakeLists.txt" snippetTag="#consumer" -``` - -``` -$ cmake . && make -$ ./asapo-consume +```shell content="./examples/install_cpp_clients.sh" ``` </TabItem> - -<TabItem value="c"> - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/c/consume.c" -``` - -Compile e.g. using Makefile and pkg-config (although we recommend CMake - see C++ section) and execute. This example assumes asapo is installed to /opt/asapo. Adjust correspondingly. - -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/c/Makefile" snippetTag="#consumer" -``` - -``` -$ make -$ ./asapo-consume -``` - - -</TabItem> - </Tabs> -the output should look like - -``` -id: 1 -file name: processed/test_file -file content: hello -stream deleted -``` - -## Step 5: Clean-up +## Code examples -Optionally, last step is to stop ASAPO services and remove files: +Please refer to the [Code Examples](cookbook/overview) sections to see the code snippets for various usage scenarious. -```shell content="@ASAPO_EXAMPLES_DIR@/getting_started/cleanup.sh" -``` - -<br/><br/> - -:::tip -You can see more examples in ASAPO [source code](https://stash.desy.de/projects/ASAPO/repos/asapo/browse/examples?at=@ASAPO_VERSION_IN_DOCS@) -::: diff --git a/docs/site/docs/p02.1.md b/docs/site/docs/p02.1.md index 1605ec707f4e7a647ae1a2d601b5e2aa72a827d3..271b2bf10a91d2ecf398e2a571b49b0df874bbe7 100644 --- a/docs/site/docs/p02.1.md +++ b/docs/site/docs/p02.1.md @@ -40,4 +40,4 @@ In the following, ASAPO specific details for the pipeline of a single detector a 8. One "nexus-write" worker per detector reads the analyzed data from the "analyzed" stream and writes it into one NeXus file per substream. The filename is constructed from the name of the stream and substream the data belongs to. The index within a substream corresponds to the index within the HDF5 dataset. 9. The data acquisition software stores all scalar data and all additional scan-metadata in a master NeXus file that links to the NeXus files produced by the ASAPO workers. 10. The viewer listens to all streams and parses the metadata to create a continuously updated tree view of all available data. Clicking on an item uses get_by_id to retrieve the actual data. A "live" mode automatically retrieves the latest data. - \ No newline at end of file + diff --git a/docs/site/docusaurus.config.js b/docs/site/docusaurus.config.js index fb16fb6207c0235d2059335406743fe75410f4ed..7411889d76f2a52ca38d83f947cd101904f82255 100644 --- a/docs/site/docusaurus.config.js +++ b/docs/site/docusaurus.config.js @@ -13,9 +13,6 @@ module.exports = { favicon: 'img/favicon.ico', organizationName: 'DESY', // Usually your GitHub org/user name. projectName: 'ASAPO', // Usually your repo name. - customFields: { - version: '@ASAPO_VERSION_IN_DOCS@', - }, plugins: [path.resolve(__dirname, 'plugins/webpackconf/src/index.js')], themeConfig: { navbar: { @@ -31,7 +28,7 @@ module.exports = { label: 'Docs', position: 'left', }, - {to: 'blog', label: 'Blog', position: 'left'}, + {to: 'blog', label: 'Changelog', position: 'left'}, { label: 'API', position: 'left', // or 'right' @@ -57,7 +54,7 @@ module.exports = { docsPluginId: 'default', }, { - href: 'https://stash.desy.de/projects/ASAPO/repos/asapo/browse?at=@ASAPO_VERSION_IN_DOCS@/', + href: 'https://stash.desy.de/projects/ASAPO/repos/asapo/browse/', label: 'BitBucket', title: 'BitBucket', position: 'right', @@ -83,8 +80,9 @@ module.exports = { }, }, blog: { - showReadingTime: true, - // Please change this to your repo. + showReadingTime: false, + path: 'changelog', + blogSidebarTitle: 'Versions' }, theme: { customCss: require.resolve('./src/css/custom.css'), diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/c/consume.c b/docs/site/examples/c/consume.c similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/c/consume.c rename to docs/site/examples/c/consume.c diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/cleanup.sh b/docs/site/examples/cleanup.sh similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/cleanup.sh rename to docs/site/examples/cleanup.sh diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/CMakeLists.txt b/docs/site/examples/cpp/CMakeLists.txt similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/CMakeLists.txt rename to docs/site/examples/cpp/CMakeLists.txt diff --git a/docs/site/examples/cpp/acknowledgements.cpp b/docs/site/examples/cpp/acknowledgements.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1f63d3f10c2780e1a4b942d3d33cbbb3cb453df7 --- /dev/null +++ b/docs/site/examples/cpp/acknowledgements.cpp @@ -0,0 +1,128 @@ +#include "asapo/asapo_producer.h" +#include "asapo/asapo_consumer.h" +#include <iostream> +#include <set> + +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { + std::cerr << "error during send: " << err << std::endl; + return; + } else if (err) { + std::cout << "warning during send: " << err << std::endl; + } else { + std::cout << "successfuly send " << payload.original_header.Json() << std::endl; + return; + } +} + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" + "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" + "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" + "VzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGV" + "zIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4" + "t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + + auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; + + auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}; + + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); + exit_if_error("Cannot start producer", err); + + producer->SetLogLevel(asapo::LogLevel::Error); + + err = producer->DeleteStream("default", 1000, asapo::DeleteStreamOptions{true, true}); + exit_if_error("Cannot delete stream", err); + + // let's start with producing a sample of 10 simple messages + for (uint64_t i = 1; i <= 10; i++) { + std::string to_send = "message#" + std::to_string(i); + auto send_size = to_send.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + asapo::MessageHeader message_header{i, send_size, "processed/test_file_" + std::to_string(i)}; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + } + + err = producer->WaitRequestsFinished(2000); + exit_if_error("Producer exit on timeout", err); + + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, credentials, &err); + exit_if_error("Cannot start consumer", err); + consumer->SetTimeout(5000); + auto group_id = consumer->GenerateNewGroupId(&err); + exit_if_error("Cannot create group id", err); + + asapo::MessageMeta mm; + asapo::MessageData data; + + const std::set<int> ids {3, 5, 7}; + + // the flag to separate the first attempt for message #3 + bool firstTryNegative = true; + + do { + err = consumer->GetNext(group_id, &mm, &data, "default"); + + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { + std::cout << "stream finished" << std::endl; + break; + } + + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { + std::cout << "stream ended" << std::endl; + break; + } + exit_if_error("Cannot get next record", err); + + // acknowledge all the messages except the ones in the set + if (ids.find(mm.id) == ids.end()) { + std::cout << "Acknowledge the message #" << mm.id << std::endl; + consumer->Acknowledge(group_id, mm.id, "default"); + } + + // for message #3 we issue a negative acknowledgement, which will put it at the next place in the stream + // in this case, it will be put in the end of a stream + if (mm.id == 3) { + if (firstTryNegative) { + std::cout << "Negative acknowledgement of the message #" << mm.id << std::endl; + // make the acknowledgement with a delay of 1 second + consumer->NegativeAcknowledge(group_id, mm.id, 2000, "default"); + firstTryNegative = false; + } else { + // on our second attempt we acknowledge the message + std::cout << "Second try of the message #" << mm.id << std::endl; + consumer->Acknowledge(group_id, mm.id, "default"); + } + } + } while (1); + + auto unacknowledgedMessages = consumer->GetUnacknowledgedMessages(group_id, 0, 0, "default", &err); + exit_if_error("Could not get list of messages", err); + + for (int i = 0; i < unacknowledgedMessages.size(); i++) { + err = consumer->GetById(unacknowledgedMessages[i], &mm, &data, "default"); + exit_if_error("Cannot get message", err); + + std::cout << "Unacknowledged message: " << reinterpret_cast<char const*>(data.get()) << std::endl; + std::cout << "id: " << mm.id << std::endl; + std::cout << "file name: " << mm.name << std::endl; + } + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/consume.cpp b/docs/site/examples/cpp/consume.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a802034c998ea78b11a2829ff7e3d21372bba7b1 --- /dev/null +++ b/docs/site/examples/cpp/consume.cpp @@ -0,0 +1,94 @@ +#include "asapo/asapo_consumer.h" +#include <iostream> + + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << std::endl << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + // test token. In production it is created during the start of the beamtime + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" + "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" + "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" + "VzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGV" + "zIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4" + "t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + + //set it according to your configuration. + auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; + + auto credentials = asapo::SourceCredentials + { + asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS + beamtime, // the folder should exist + "", // can be empty or "auto", if beamtime_id is given + "test_source", // source + token // athorization token + }; + + auto consumer = asapo::ConsumerFactory::CreateConsumer + (endpoint, + path_to_files, + true, // True if the path_to_files is accessible locally, False otherwise + credentials, // same as for producer + &err); + + exit_if_error("Cannot create consumer", err); + consumer->SetTimeout(5000); // How long do you want to wait on non-finished stream for a message. + + // you can get info about the streams in the beamtime + for (const auto& stream : consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err)) + { + std::cout << "Stream name: " << stream.name << std::endl; + std::cout << "LastId: " << stream.last_id << std::endl; + std::cout << "Stream finished: " << stream.finished << std::endl; + std::cout << "Next stream: " << stream.next_stream << std::endl; + } + + // Several consumers can use the same group_id to process messages in parallel + auto group_id = consumer->GenerateNewGroupId(&err); + exit_if_error("Cannot create group id", err); + + asapo::MessageMeta mm; + asapo::MessageData data; + + do { + // GetNext is the main function to get messages from streams. You would normally call it in loop. + // you can either manually compare the mm.id to the stream.last_id, or wait for the error to happen + err = consumer->GetNext(group_id, &mm, &data, "default"); + + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { + // all the messages in the stream were processed + std::cout << "stream finished" << std::endl; + break; + } + + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { + // not-finished stream timeout, or wrong or empty stream + std::cout << "stream ended" << std::endl; + break; + } + + exit_if_error("Cannot get next record", err); + + std::cout << "id: " << mm.id << std::endl; + std::cout << "file name: " << mm.name << std::endl; + std::cout << "message content: " << reinterpret_cast<char const*>(data.get()) << std::endl; + } while (1); + + // you can delete the stream after consuming + err = consumer->DeleteStream("default", asapo::DeleteStreamOptions{true, true}); + exit_if_error("Cannot delete stream", err); + std::cout << "stream deleted" << std::endl; + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/consume_dataset.cpp b/docs/site/examples/cpp/consume_dataset.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a5f8be96630de3d2713b863972dd9b33b187470e --- /dev/null +++ b/docs/site/examples/cpp/consume_dataset.cpp @@ -0,0 +1,69 @@ +#include "asapo/asapo_consumer.h" +#include <iostream> + + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << std::endl << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" + "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" + "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" + "VzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGV" + "zIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4" + "t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + + auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; + + auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}; + + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, credentials, &err); + exit_if_error("Cannot create consumer", err); + consumer->SetTimeout((uint64_t) 5000); + + auto group_id = consumer->GenerateNewGroupId(&err); + exit_if_error("Cannot create group id", err); + + asapo::DataSet ds; + asapo::MessageData data; + + do { + ds = consumer->GetNextDataset(group_id, 0, "default", &err); + + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { + std::cout << "stream finished" << std::endl; + break; + } + + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { + std::cout << "stream ended" << std::endl; + break; + } + exit_if_error("Cannot get next record", err); + + std::cout << "Dataset Id: " << ds.id << std::endl; + + for(int i = 0; i < ds.content.size(); i++) + { + err = consumer->RetrieveData(&ds.content[i], &data); + exit_if_error("Cannot get dataset content", err); + + std::cout << "Part " << ds.content[i].dataset_substream << " out of " << ds.expected_size << std:: endl; + std::cout << "message content: " << reinterpret_cast<char const*>(data.get()) << std::endl; + } + } while (1); + + err = consumer->DeleteStream("default", asapo::DeleteStreamOptions{true, true}); + exit_if_error("Cannot delete stream", err); + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/metadata.cpp b/docs/site/examples/cpp/metadata.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b64351fbe7433d8b6946cf52f75a2d25e7d5ced5 --- /dev/null +++ b/docs/site/examples/cpp/metadata.cpp @@ -0,0 +1,147 @@ +#include "asapo/asapo_producer.h" +#include "asapo/asapo_consumer.h" +#include <iostream> + +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { + std::cerr << "error during send: " << err << std::endl; + return; + } else if (err) { + std::cout << "warning during send: " << err << std::endl; + } else { + std::cout << "successfuly send " << payload.original_header.Json() << std::endl; + return; + } +} + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" + "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" + "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" + "VzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGV" + "zIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4" + "t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + + auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; + + auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}; + + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); + exit_if_error("Cannot start producer", err); + producer->SetLogLevel(asapo::LogLevel::Error); + + // sample beamtime metadata. You can add any data you want, with any level of complexity + // in this example we use strings and ints, and one nested structure + auto beamtime_metadata = "{" + " \"name\": \"beamtime name\"," + " \"condition\": \"beamtime condition\"," + " \"intvalue1\": 5," + " \"intvalue2\": 10," + " \"structure\": {" + " \"structint1\": 20," + " \"structint2\": 30" + " }" + "}"; + + // send the metadata + // with this call the new metadata will completely replace the one that's already there + err = producer->SendBeamtimeMetadata(beamtime_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kReplace, true}, &ProcessAfterSend); + exit_if_error("Cannot send metadata", err); + + // we can update the existing metadata if we want, by modifying the existing fields, or adding new ones + auto beamtime_metadata_update = "{" + " \"condition\": \"updated beamtime condition\"," + " \"newintvalue\": 15" + "}"; + + // send the metadata in the 'kUpdate' mode + err = producer->SendBeamtimeMetadata(beamtime_metadata_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, &ProcessAfterSend); + exit_if_error("Cannot send metadata", err); + + // sample stream metadata + auto stream_metadata = "{" + " \"name\": \"stream name\"," + " \"condition\": \"stream condition\"," + " \"intvalue\": 44" + "}"; + + // works the same way: for the initial set we use 'kReplace' the stream metadata, but update is also possible + // update works exactly the same as for beamtime, but here we will only do 'kReplace' + err = producer->SendStreamMetadata(stream_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, "default", &ProcessAfterSend); + exit_if_error("Cannot send metadata", err); + + // sample message metadata + auto message_metadata = "{" + " \"name\": \"message name\"," + " \"condition\": \"message condition\"," + " \"somevalue\": 55" + "}"; + + std::string data_string = "hello"; + auto send_size = data_string.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), data_string.c_str(), send_size); + + // the message metadata is sent together with the message itself + // in case of datasets each part has its own metadata + asapo::MessageHeader message_header{1, send_size, "processed/test_file", message_metadata}; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + + err = producer->WaitRequestsFinished(2000); + exit_if_error("Producer exit on timeout", err); + + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, credentials, &err); + exit_if_error("Cannot start consumer", err); + + // read the beamtime metadata + auto beamtime_metadata_read = consumer->GetBeamtimeMeta(&err); + exit_if_error("Cannot get metadata", err); + + std::cout << "Updated beamtime metadata:" << std::endl << beamtime_metadata_read << std::endl; + + // read the stream metadata + auto stream_metadata_read = consumer->GetStreamMeta("default", &err); + exit_if_error("Cannot get metadata", err); + + std::cout << "Stream metadata:" << std::endl << stream_metadata_read << std::endl; + + auto group_id = consumer->GenerateNewGroupId(&err); + exit_if_error("Cannot create group id", err); + + asapo::MessageMeta mm; + asapo::MessageData data; + + do { + err = consumer->GetNext(group_id, &mm, &data, "default"); + + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { + std::cout << "stream finished" << std::endl; + break; + } + + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { + std::cout << "stream ended" << std::endl; + break; + } + exit_if_error("Cannot get next record", err); + + std::cout << "Message #" << mm.id << std::endl; + // our custom metadata is stored inside the message metadata + std::cout << "Message metadata:" << std::endl << mm.metadata << std::endl; + } while (1); + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/next_stream.cpp b/docs/site/examples/cpp/next_stream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..85ae4602f3694b6c4eaa68914f16d366910fb40b --- /dev/null +++ b/docs/site/examples/cpp/next_stream.cpp @@ -0,0 +1,121 @@ +#include "asapo/asapo_producer.h" +#include "asapo/asapo_consumer.h" +#include <iostream> + +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { + std::cerr << "error during send: " << err << std::endl; + return; + } else if (err) { + std::cout << "warning during send: " << err << std::endl; + } else { + std::cout << "successfuly send " << payload.original_header.Json() << std::endl; + return; + } +} + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" + "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" + "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" + "VzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGV" + "zIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4" + "t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + + auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; + + auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}; + + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); + exit_if_error("Cannot start producer", err); + + producer->SetLogLevel(asapo::LogLevel::Error); + + // let's start with producing a sample of 10 simple messages + for (uint64_t i = 1; i <= 10; i++) { + std::string to_send = "content of the message #" + std::to_string(i); + auto send_size = to_send.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + asapo::MessageHeader message_header{i, send_size, "processed/test_file_" + std::to_string(i)}; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + } + + // finish the stream and set the next stream to be called 'next' + producer->SendStreamFinishedFlag("default", 10, "next", &ProcessAfterSend); + + // populate the 'next' stream as well + for (uint64_t i = 1; i <= 5; i++) { + std::string to_send = "content of the message #" + std::to_string(i); + auto send_size = to_send.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + asapo::MessageHeader message_header{i, send_size, "processed/test_file_next_" + std::to_string(i)}; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "next", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + } + + // we leave the 'next' stream unfinished, but the chain of streams can be of any length + + err = producer->WaitRequestsFinished(2000); + exit_if_error("Producer exit on timeout", err); + + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, credentials, &err); + consumer->SetTimeout(5000); + auto group_id = consumer->GenerateNewGroupId(&err); + exit_if_error("Cannot create group id", err); + + asapo::MessageMeta mm; + asapo::MessageData data; + + // we start with the 'default' stream (the first one) + std::string stream_name = "default"; + + do { + err = consumer->GetNext(group_id, &mm, &data, stream_name); + + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { + // when the stream finishes, we look for the info on the next stream + auto streams = consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err); + // first, we find the stream with our name in the list of streams + auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo& s) { return s.name == stream_name; }); + + // then we look if the field 'nextStream' is set and not empty + if (stream != streams.end() && !stream->next_stream.empty()) { + // if it's not, we continue with the next stream + stream_name = stream->next_stream; + std::cout << "Changing stream to the next one: " << stream_name << std::endl; + continue; + } else { + // otherwise we stop + std::cout << "stream finished" << std::endl; + break; + } + } + + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { + std::cout << "stream ended" << std::endl; + break; + } + exit_if_error("Cannot get next record", err); + + std::cout << "Message #" << mm.id << ", message content: " << reinterpret_cast<char const*>(data.get()) << std::endl; + } while (1); + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/pipeline.cpp b/docs/site/examples/cpp/pipeline.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e399e6fe5dd62463f3eaf0e818067644ef4e528f --- /dev/null +++ b/docs/site/examples/cpp/pipeline.cpp @@ -0,0 +1,96 @@ +#include "asapo/asapo_producer.h" +#include "asapo/asapo_consumer.h" +#include <iostream> + +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { + std::cerr << "error during send: " << err << std::endl; + return; + } else if (err) { + std::cout << "warning during send: " << err << std::endl; + } else { + std::cout << "successfuly send " << payload.original_header.Json() << std::endl; + return; + } +} + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" + "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" + "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" + "VzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGV" + "zIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4" + "t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + + auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; + + auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}; + + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); + exit_if_error("Cannot start producer", err); + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, credentials, &err); + exit_if_error("Cannot start consumer", err); + consumer->SetTimeout(5000); + auto group_id = consumer->GenerateNewGroupId(&err); + exit_if_error("Cannot create group id", err); + + // put the processed message into the new stream + auto pipelined_stream_name = "pipelined"; + + asapo::MessageMeta mm; + asapo::MessageData data; + + do { + // we expect the message to be in the 'default' stream already + err = consumer->GetNext(group_id, &mm, &data, "default"); + + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { + std::cout << "stream finished" << std::endl; + break; + } + + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { + std::cout << "stream ended" << std::endl; + break; + } + exit_if_error("Cannot get next record", err); + + // work on our data + auto processed_string = std::string(reinterpret_cast<char const*>(data.get())) + " processed"; + auto send_size = processed_string.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), processed_string.c_str(), send_size); + + // you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase + asapo::MessageHeader message_header{mm.id, send_size, std::string("processed/test_file_") + std::to_string(mm.id)}; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, pipelined_stream_name, &ProcessAfterSend); + exit_if_error("Cannot send message", err); + } while (1); + + + err = producer->WaitRequestsFinished(2000); + exit_if_error("Producer exit on timeout", err); + + // the meta from the last iteration corresponds to the last message + auto last_id = mm.id; + + err = producer->SendStreamFinishedFlag("pipelined",last_id, "", &ProcessAfterSend); + exit_if_error("Cannot finish stream", err); + + // you can remove the source stream if you do not need it anymore + err = consumer->DeleteStream("default", asapo::DeleteStreamOptions{true, true}); + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/produce.cpp b/docs/site/examples/cpp/produce.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9faa61106eff6e55c502df383854e9275b1fe111 --- /dev/null +++ b/docs/site/examples/cpp/produce.cpp @@ -0,0 +1,78 @@ +#include "asapo/asapo_producer.h" +#include <iostream> + +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { + // the data was not sent. Something is terribly wrong. + std::cerr << "error during send: " << err << std::endl; + return; + } else if (err) { + // The data was sent, but there was some unexpected problem, e.g. the file was overwritten. + std::cout << "warning during send: " << err << std::endl; + } else { + // all fine + std::cout << "successfuly send " << payload.original_header.Json() << std::endl; + return; + } +} + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto credentials = asapo::SourceCredentials + { + asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS + beamtime, // the folder should exist + "", // can be empty or "auto", if beamtime_id is given + "test_source", // source + "" // athorization token + }; + + auto producer = asapo::Producer::Create(endpoint, + 1, // number of threads. Increase, if the sending speed seems slow + asapo::RequestHandlerType::kTcp, // Use kTcp. Use kFilesystem for direct storage of files + credentials, + 60000, // timeout. Do not change. + &err); + exit_if_error("Cannot start producer", err); + + // the message must be manually copied to the buffer of the relevant size + std::string to_send = "hello"; + auto send_size = to_send.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + // we are sending a message with with index 1. Filename must start with processed/ + asapo::MessageHeader message_header{1, send_size, "processed/test_file"}; + // use the default stream + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + + // send data in loop + + // add the following at the end of the script + + err = producer->WaitRequestsFinished(2000); // will synchronously wait for all the data to be sent. + // Use it when no more data is expected. + exit_if_error("Producer exit on timeout", err); + + // you may want to mark the stream as finished + err = producer->SendStreamFinishedFlag("default", // name of the stream. + 1, // the number of the last message in the stream + "", // next stream or empty + &ProcessAfterSend); + exit_if_error("Cannot finish stream", err); + std::cout << "stream finished" << std::endl; + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/produce_dataset.cpp b/docs/site/examples/cpp/produce_dataset.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e2ea32e77bd291cafeae3ed1d9febdbed9f57ba4 --- /dev/null +++ b/docs/site/examples/cpp/produce_dataset.cpp @@ -0,0 +1,73 @@ +#include "asapo/asapo_producer.h" +#include <iostream> + +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { + std::cerr << "error during send: " << err << std::endl; + return; + } else if (err) { + std::cout << "warning during send: " << err << std::endl; + } else { + std::cout << "successfuly send " << payload.original_header.Json() << std::endl; + return; + } +} + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", ""}; + + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); + exit_if_error("Cannot start producer", err); + + std::string to_send = "hello dataset 1"; + auto send_size = to_send.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + // add the additional paremeters to the header: part number in the dataset and the total number of parts + asapo::MessageHeader message_header{1, send_size, "processed/test_file_dataset_1", "", 1, 3}; + + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + + // this can be done from different producers in any order + // we do not recalculate send_size since we know it to be the same + // we reuse the header to shorten the code + to_send = "hello dataset 2"; + buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + message_header.dataset_substream = 2; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + + to_send = "hello dataset 3"; + buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + message_header.dataset_substream = 3; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + + err = producer->WaitRequestsFinished(2000); + exit_if_error("Producer exit on timeout", err); + + // the dataset parts are not counted towards the number of messages in the stream + // the last message id in this example is still 1 + err = producer->SendStreamFinishedFlag("default", 1, "", &ProcessAfterSend); + exit_if_error("Cannot finish stream", err); + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/cpp/query.cpp b/docs/site/examples/cpp/query.cpp new file mode 100644 index 0000000000000000000000000000000000000000..945b5211a1f0a3c41603617f78b25f7d8aa118fc --- /dev/null +++ b/docs/site/examples/cpp/query.cpp @@ -0,0 +1,114 @@ +#include "asapo/asapo_producer.h" +#include "asapo/asapo_consumer.h" +#include <iostream> +#include <chrono> + +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { + std::cerr << "error during send: " << err << std::endl; + return; + } else if (err) { + std::cout << "warning during send: " << err << std::endl; + } else { + std::cout << "successfuly send " << payload.original_header.Json() << std::endl; + return; + } +} + +void PrintMessages(asapo::MessageMetas metas, std::unique_ptr<asapo::Consumer>& consumer) { + asapo::MessageData data; + asapo::Error err; + for (int i = 0; i < metas.size(); i++) { + err = consumer->RetrieveData(&metas[i], &data); + std::cout << "Message #" << metas[i].id + << ", content: " << reinterpret_cast<char const*>(data.get()) + << ", user metadata: " << metas[i].metadata << std::endl; + } +} + +void exit_if_error(std::string error_string, const asapo::Error& err) { + if (err) { + std::cerr << error_string << err << std::endl; + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + asapo::Error err; + + auto endpoint = "localhost:8400"; + auto beamtime = "asapo_test"; + + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" + "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" + "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" + "VzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGV" + "zIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4" + "t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + + auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; + + auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}; + + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); + exit_if_error("Cannot start producer", err); + + producer->SetLogLevel(asapo::LogLevel::Error); + + err = producer->DeleteStream("default", 0, asapo::DeleteStreamOptions{true, true}); + exit_if_error("Cannot delete stream", err); + + // let's start with producing some messages with metadata + for (uint64_t i = 1; i <= 10; i++) { + auto message_metadata = "{" + " \"condition\": \"condition #" + std::to_string(i) + "\"," + " \"somevalue\": " + std::to_string(i * 10) + + "}"; + + std::string to_send = "message#" + std::to_string(i); + auto send_size = to_send.size() + 1; + auto buffer = asapo::MessageData(new uint8_t[send_size]); + memcpy(buffer.get(), to_send.c_str(), send_size); + + asapo::MessageHeader message_header{i, send_size, "processed/test_file_" + std::to_string(i), message_metadata}; + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); + exit_if_error("Cannot send message", err); + } + + err = producer->WaitRequestsFinished(2000); + exit_if_error("Producer exit on timeout", err); + + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, credentials, &err); + exit_if_error("Cannot create group id", err); + consumer->SetTimeout(5000); + + auto metadatas = consumer->QueryMessages("_id = 1", "default", &err); + exit_if_error("Cannot query messages", err); + std::cout << "Message with ID = 1" << std::endl; + PrintMessages(metadatas, consumer); + + metadatas = consumer->QueryMessages("_id >= 8", "default", &err); + exit_if_error("Cannot query messages", err); + std::cout << "essages with ID >= 8" << std::endl; + PrintMessages(metadatas, consumer); + + metadatas = consumer->QueryMessages("meta.condition = \"condition #7\"", "default", &err); + exit_if_error("Cannot query messages", err); + std::cout << "Message with condition = 'condition #7'" << std::endl; + PrintMessages(metadatas, consumer); + + metadatas = consumer->QueryMessages("meta.somevalue > 30 AND meta.somevalue < 60", "default", &err); + exit_if_error("Cannot query messages", err); + std::cout << "Message with 30 < somevalue < 60" << std::endl; + PrintMessages(metadatas, consumer); + + auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); + auto fifteen_minutes_ago = std::chrono::duration_cast<std::chrono::nanoseconds>((std::chrono::system_clock::now() - std::chrono::minutes(15)).time_since_epoch()).count(); + std::cout << now << " " << fifteen_minutes_ago << std::endl; + metadatas = consumer->QueryMessages("timestamp < " + std::to_string(now) + " AND timestamp > " + std::to_string(fifteen_minutes_ago), "default", &err); + exit_if_error("Cannot query messages", err); + std::cout << "Messages in the last 15 minutes" << std::endl; + PrintMessages(metadatas, consumer); + + return EXIT_SUCCESS; +} diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/c/Makefile b/docs/site/examples/frozen_versions/21.06.0/getting_started/c/Makefile deleted file mode 100644 index ba3d4a872835ae3d20e6dfe43452a6ede2771332..0000000000000000000000000000000000000000 --- a/docs/site/examples/frozen_versions/21.06.0/getting_started/c/Makefile +++ /dev/null @@ -1,29 +0,0 @@ -PROGRAM=asapo-consume - -LDFLAGS = "-Wl,-rpath,/opt/asapo/lib" -CFLAGS += `PKG_CONFIG_PATH=/opt/asapo/lib/pkgconfig pkg-config --cflags libasapo-consumer` -LIBS = `PKG_CONFIG_PATH=/opt/asapo/lib/pkgconfig pkg-config --libs libasapo-consumer` - -# for default installation -#LDFLAGS = -#CFLAGS += `pkg-config --cflags libasapo-consumer` -#LIBS = `pkg-config --libs libasapo-consumer` - -RM=rm -f - -SRCS=consume.c -OBJS=$(subst .c,.o,$(SRCS)) - -all: $(PROGRAM) - -$(PROGRAM): $(OBJS) - $(CC) $(LDFLAGS) -o $@ $^ $(LIBS) - -%.o: %.cpp - $(CC) $(CFLAGS) $(INCLUDE) -c -o $@ $< - -clean: - $(RM) $(OBJS) - -distclean: clean - $(RM) $(PROGRAM) diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/python/produce.py b/docs/site/examples/frozen_versions/21.06.0/getting_started/python/produce.py deleted file mode 100644 index e8e9efd86dc089c6e023f201e3ac19501a20e5a4..0000000000000000000000000000000000000000 --- a/docs/site/examples/frozen_versions/21.06.0/getting_started/python/produce.py +++ /dev/null @@ -1,20 +0,0 @@ -import asapo_producer - -def callback(payload,err): - if err is not None: - print("could not sent: ",payload,err) - else: - print ("successfuly sent: ",payload) - -endpoint = "localhost:8400" -beamtime = "asapo_test" - -# source type 'processed' to write to the core filesystem -producer = asapo_producer.create_producer(endpoint,'processed', - beamtime,'auto','test_source','', 1,60000) - -# we are sending a message with with index 1 to the default stream. Filename must start with processed/ -producer.send(1, "processed/test_file",b"hello", - callback = callback) - -producer.wait_requests_finished(2000) diff --git a/docs/site/examples/getting_started/c/Makefile b/docs/site/examples/getting_started/c/Makefile deleted file mode 100644 index ba3d4a872835ae3d20e6dfe43452a6ede2771332..0000000000000000000000000000000000000000 --- a/docs/site/examples/getting_started/c/Makefile +++ /dev/null @@ -1,29 +0,0 @@ -PROGRAM=asapo-consume - -LDFLAGS = "-Wl,-rpath,/opt/asapo/lib" -CFLAGS += `PKG_CONFIG_PATH=/opt/asapo/lib/pkgconfig pkg-config --cflags libasapo-consumer` -LIBS = `PKG_CONFIG_PATH=/opt/asapo/lib/pkgconfig pkg-config --libs libasapo-consumer` - -# for default installation -#LDFLAGS = -#CFLAGS += `pkg-config --cflags libasapo-consumer` -#LIBS = `pkg-config --libs libasapo-consumer` - -RM=rm -f - -SRCS=consume.c -OBJS=$(subst .c,.o,$(SRCS)) - -all: $(PROGRAM) - -$(PROGRAM): $(OBJS) - $(CC) $(LDFLAGS) -o $@ $^ $(LIBS) - -%.o: %.cpp - $(CC) $(CFLAGS) $(INCLUDE) -c -o $@ $< - -clean: - $(RM) $(OBJS) - -distclean: clean - $(RM) $(PROGRAM) diff --git a/docs/site/examples/getting_started/cpp/consume.cpp b/docs/site/examples/getting_started/cpp/consume.cpp deleted file mode 100644 index f9124fa9e3e3f3e2e3ca9b94f66c7bb34050ca9e..0000000000000000000000000000000000000000 --- a/docs/site/examples/getting_started/cpp/consume.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "asapo/asapo_consumer.h" -#include <iostream> - - -void exit_if_error(std::string error_string, const asapo::Error& err) { - if (err) { - std::cerr << error_string << err << std::endl; - exit(EXIT_FAILURE); - } -} - -int main(int argc, char* argv[]) { - asapo::Error err; - - auto endpoint = "localhost:8400"; // // or your endpoint - auto beamtime = "asapo_test"; - auto token = - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; - - auto path_to_files = - "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; //set it according to your configuration. - - auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}, &err); - exit_if_error("Cannot create consumer", err); - consumer->SetTimeout((uint64_t) 5000); - - auto group_id = consumer->GenerateNewGroupId(&err); - exit_if_error("Cannot create group id", err); - - asapo::MessageMeta mm; - asapo::MessageData data; - err = consumer->GetNext(group_id, &mm, &data, "default"); - exit_if_error("Cannot get next record", err); - - std::cout << "id: " << mm.id << std::endl; - std::cout << "file name: " << mm.name << std::endl; - std::cout << "message content: " << reinterpret_cast<char const*>(data.get()) << std::endl; - -// delete stream - err = consumer->DeleteStream("default", asapo::DeleteStreamOptions{true, true}); - exit_if_error("Cannot delete stream", err); - std::cout << "stream deleted"; - - return EXIT_SUCCESS; -} diff --git a/docs/site/examples/getting_started/cpp/produce.cpp b/docs/site/examples/getting_started/cpp/produce.cpp deleted file mode 100644 index 0a2cf248698597108450760116cc169f7ab5287c..0000000000000000000000000000000000000000 --- a/docs/site/examples/getting_started/cpp/produce.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "asapo/asapo_producer.h" - -#include <iostream> - -void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { - if (err) { - std::cerr << "error/warning during send: " << err << std::endl; - return; - } else { - std::cout << "successfuly send " << payload.original_header.Json() << std::endl; - return; - } -} - -void exit_if_error(std::string error_string, const asapo::Error& err) { - if (err) { - std::cerr << error_string << err << std::endl; - exit(EXIT_FAILURE); - } -} - -int main(int argc, char* argv[]) { - asapo::Error err; - - auto endpoint = "localhost:8400"; // or your endpoint - auto beamtime = "asapo_test"; - - auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", ""}, 60000, &err); - exit_if_error("Cannot start producer", err); - - std::string to_send = "hello"; - auto send_size = to_send.size() + 1; - auto buffer = asapo::MessageData(new uint8_t[send_size]); - memcpy(buffer.get(), to_send.c_str(), send_size); - - asapo::MessageHeader message_header{1, send_size, "processed/test_file"}; - err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); - exit_if_error("Cannot send message", err); - - err = producer->WaitRequestsFinished(2000); - exit_if_error("Producer exit on timeout", err); - - return EXIT_SUCCESS; -} diff --git a/docs/site/examples/getting_started/python/consume.py b/docs/site/examples/getting_started/python/consume.py deleted file mode 100644 index a2fae5d3498ab85cb7dec589ccbdb45778e4989d..0000000000000000000000000000000000000000 --- a/docs/site/examples/getting_started/python/consume.py +++ /dev/null @@ -1,19 +0,0 @@ -import asapo_consumer - -endpoint = "localhost:8400" -beamtime = "asapo_test" -token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU" - -path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" #set it according to your configuration. -consumer = asapo_consumer.create_consumer(endpoint,path_to_files,False, beamtime,"test_source",token,5000) -group_id = consumer.generate_group_id() - -data, meta = consumer.get_next(group_id, meta_only = False) - -print ('id:',meta['_id']) -print ('file name:',meta['name']) -print ('file content:',data.tobytes().decode("utf-8")) - -#delete stream -consumer.delete_stream(error_on_not_exist = True) -print ('stream deleted') diff --git a/docs/site/examples/getting_started/python/produce.py b/docs/site/examples/getting_started/python/produce.py deleted file mode 100644 index e8e9efd86dc089c6e023f201e3ac19501a20e5a4..0000000000000000000000000000000000000000 --- a/docs/site/examples/getting_started/python/produce.py +++ /dev/null @@ -1,20 +0,0 @@ -import asapo_producer - -def callback(payload,err): - if err is not None: - print("could not sent: ",payload,err) - else: - print ("successfuly sent: ",payload) - -endpoint = "localhost:8400" -beamtime = "asapo_test" - -# source type 'processed' to write to the core filesystem -producer = asapo_producer.create_producer(endpoint,'processed', - beamtime,'auto','test_source','', 1,60000) - -# we are sending a message with with index 1 to the default stream. Filename must start with processed/ -producer.send(1, "processed/test_file",b"hello", - callback = callback) - -producer.wait_requests_finished(2000) diff --git a/docs/site/examples/getting_started/install_cpp_clients.sh b/docs/site/examples/install_cpp_clients.sh similarity index 71% rename from docs/site/examples/getting_started/install_cpp_clients.sh rename to docs/site/examples/install_cpp_clients.sh index db4190c738615551228e7968950f3286cb1c1a6f..22fddcc6df35d65c230b6f75be1d7e3453a5d2ac 100644 --- a/docs/site/examples/getting_started/install_cpp_clients.sh +++ b/docs/site/examples/install_cpp_clients.sh @@ -2,7 +2,7 @@ # you can also install Linux/Windows packages if you have root access (or install locally). # take a look at http://nims.desy.de/extra/asapo/linux_packages/ or http://nims.desy.de/extra/asapo/windows10 for your OS. E.g. for Debian 10.7 -wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/asapo-dev-@ASAPO_VERSION_IN_DOCS@-debian10.7.x86_64.deb -sudo apt install ./asapo-dev-@ASAPO_VERSION_IN_DOCS@-debian10.7.x86_64.deb +wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/asapo-dev-100.0.develop-debian10.7.x86_64.deb +sudo apt install ./asapo-dev-100.0.develop-debian10.7.x86_64.deb diff --git a/docs/site/examples/getting_started/install_python_clients_pip.sh b/docs/site/examples/install_python_clients_pip.sh similarity index 60% rename from docs/site/examples/getting_started/install_python_clients_pip.sh rename to docs/site/examples/install_python_clients_pip.sh index 83d8c696f36e30f896e0f1e2110cd1ad57fc5005..3a9d1faf519ab8d49dc273f1c4cbd4ec63f09eb8 100644 --- a/docs/site/examples/getting_started/install_python_clients_pip.sh +++ b/docs/site/examples/install_python_clients_pip.sh @@ -1,13 +1,13 @@ #!/usr/bin/env bash -pip3 install --user --trusted-host nims.desy.de --find-links=http://nims.desy.de/extra/asapo/linux_wheels asapo_producer==@ASAPO_WHEEL_VERSION_IN_DOCS@ -pip3 install --user --trusted-host nims.desy.de --find-links=http://nims.desy.de/extra/asapo/linux_wheels asapo_consumer==@ASAPO_WHEEL_VERSION_IN_DOCS@ +pip3 install --user --trusted-host nims.desy.de --find-links=http://nims.desy.de/extra/asapo/linux_wheels asapo_producer==100.0.develop +pip3 install --user --trusted-host nims.desy.de --find-links=http://nims.desy.de/extra/asapo/linux_wheels asapo_consumer==100.0.develop # you might need to update pip if the above commands error: pip3 install --upgrade pip # if that does not work (abi incompatibility, etc) you may try to install source packages # take a look at http://nims.desy.de/extra/asapo/linux_packages/ or http://nims.desy.de/extra/asapo/windows10 for your OS. E.g. for Debian 10.7 -# wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/asapo_producer-@ASAPO_VERSION_IN_DOCS@.tar.gz -# wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/asapo_consumer-@ASAPO_VERSION_IN_DOCS@.tar.gz +# wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/asapo_producer-100.0.develop.tar.gz +# wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/asapo_consumer-100.0.develop.tar.gz -# pip3 install asapo_producer-@ASAPO_VERSION_IN_DOCS@.tar.gz -# pip3 install asapo_consumer-@ASAPO_VERSION_IN_DOCS@.tar.gz \ No newline at end of file +# pip3 install asapo_producer-100.0.develop.tar.gz +# pip3 install asapo_consumer-100.0.develop.tar.gz diff --git a/docs/site/examples/getting_started/install_python_clients_pkg.sh b/docs/site/examples/install_python_clients_pkg.sh similarity index 55% rename from docs/site/examples/getting_started/install_python_clients_pkg.sh rename to docs/site/examples/install_python_clients_pkg.sh index 5c13804b7381097511a0778068c098f9f8c0eec5..4542fcc48ad4807ff07ae72422d429aa1f38a3c7 100644 --- a/docs/site/examples/getting_started/install_python_clients_pkg.sh +++ b/docs/site/examples/install_python_clients_pkg.sh @@ -2,7 +2,8 @@ # you can also install Linux/Windows packages if you have root access (or install locally). # take a look at http://nims.desy.de/extra/asapo/linux_packages/ or http://nims.desy.de/extra/asapo/windows10 for your OS. E.g. for Debian 10.7 -wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/python-asapo-producer_@ASAPO_VERSION_IN_DOCS@-debian10.7_amd64.deb -wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/python-asapo-consumer_@ASAPO_VERSION_IN_DOCS@-debian10.7_amd64.deb -sudo apt install ./python3-asapo-producer_@ASAPO_VERSION_IN_DOCS@-debian10.7_amd64.deb -sudo apt install ./python3-asapo_consumer_@ASAPO_VERSION_IN_DOCS@-debian10.7_amd64.deb \ No newline at end of file +wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/python-asapo-producer_100.0~develop-debian10.7_amd64.deb +wget http://nims.desy.de/extra/asapo/linux_packages/debian10.7/python-asapo-consumer_100.0~develop-debian10.7_amd64.deb + +sudo apt install ./python3-asapo-producer_100.0~develop-debian10.7_amd64.deb +sudo apt install ./python3-asapo_consumer_100.0~develop-debian10.7_amd64.deb diff --git a/docs/site/examples/python/acknowledgements.py b/docs/site/examples/python/acknowledgements.py new file mode 100644 index 0000000000000000000000000000000000000000..f9eb7aba304a668d5f0af7cee5ab3761f9d7f88c --- /dev/null +++ b/docs/site/examples/python/acknowledgements.py @@ -0,0 +1,70 @@ +import asapo_consumer +import asapo_producer + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + print("could not send: ",payload,err) + elif err is not None: + print("sent with warning: ",payload,err) + else: + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ" +"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ" +"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ" +"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU") + +path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" + +producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', '', 1, 60000) +producer.set_log_level('error') + +# let's start with producing a sample of 10 simple messages +for i in range(1, 11): + producer.send(i, "processed/test_file_ack_" + str(i), ('message #' + str(i)).encode(), stream = "default", callback = callback) + +producer.wait_requests_finished(2000) + +consumer = asapo_consumer.create_consumer(endpoint, path_to_files, True, beamtime, "test_source", token, 5000) +group_id = consumer.generate_group_id() + +# the flag to separate the first attempt for message #3 +firstTryNegative = True + +try: + while True: + data, meta = consumer.get_next(group_id, meta_only = False) + text_data = data.tobytes().decode("utf-8") + message_id = meta['_id'] + + # acknowledge all the messages except these + if message_id not in [3,5,7]: + print('Acknowledge the message #', message_id) + consumer.acknowledge(group_id, message_id) + + # for message #3 we issue a negative acknowledgement, which will put it at the next place in the stream + # in this case, it will be put in the end of a stream + if message_id == 3: + if firstTryNegative: + print('Negative acknowledgement of the message #', message_id) + # make the acknowledgement with a delay of 1 second + consumer.neg_acknowledge(group_id, message_id, delay_ms=2000) + firstTryNegative = False + else: + # on our second attempt we acknowledge the message + print('Second try of the message #', message_id) + consumer.acknowledge(group_id, message_id) + +except asapo_consumer.AsapoStreamFinishedError: + print('stream finished') + +except asapo_consumer.AsapoEndOfStreamError: + print('stream ended') + +for message_id in consumer.get_unacknowledged_messages(group_id): + data, meta = consumer.get_by_id(message_id, meta_only = False) + print('Unacknowledged message:', data.tobytes().decode("utf-8"), meta) diff --git a/docs/site/examples/python/consume.py b/docs/site/examples/python/consume.py new file mode 100644 index 0000000000000000000000000000000000000000..dfddaf8d46492b2c9a218cea135797535f2fab78 --- /dev/null +++ b/docs/site/examples/python/consume.py @@ -0,0 +1,51 @@ +import asapo_consumer + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +# test token. In production it is created during the start of the beamtime +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ" +"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ" +"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ" +"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU") + +# set it according to your configuration. +path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" + + +consumer = asapo_consumer \ + .create_consumer(endpoint, + path_to_files, + True, # True if the path_to_files is accessible locally, False otherwise + beamtime, # Same as for the producer + "test_source", # Same as for the producer + token, # Access token + 5000) # Timeout. How long do you want to wait on non-finished stream for a message. + + +# you can get info about the streams in the beamtime +for stream in consumer.get_stream_list(): + print("Stream name: ", stream['name'], "\n", + "LastId: ", stream['lastId'], "\n", + "Stream finished: ", stream['finished'], "\n", + "Next stream: ", stream['nextStream']) + + +group_id = consumer.generate_group_id() # Several consumers can use the same group_id to process messages in parallel + +try: + + # get_next is the main function to get messages from streams. You would normally call it in loop. + # you can either manually compare the meta['_id'] to the stream['lastId'], or wait for the exception to happen + while True: + data, meta = consumer.get_next(group_id, meta_only = False) + print(data.tobytes().decode("utf-8"), meta) + +except asapo_consumer.AsapoStreamFinishedError: + print('stream finished') # all the messages in the stream were processed + +except asapo_consumer.AsapoEndOfStreamError: + print('stream ended') # not-finished stream timeout, or wrong or empty stream + +consumer.delete_stream(error_on_not_exist = True) # you can delete the stream after consuming diff --git a/docs/site/examples/python/consume_dataset.py b/docs/site/examples/python/consume_dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..cc81a95d33382087698f217c926041432f435de2 --- /dev/null +++ b/docs/site/examples/python/consume_dataset.py @@ -0,0 +1,38 @@ +import asapo_consumer + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ" +"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ" +"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ" +"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU") + +path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" + +consumer = asapo_consumer.create_consumer(endpoint, path_to_files, True, beamtime, "test_source", token, 5000) + +group_id = consumer.generate_group_id() + +try: + + # get_next_dataset behaves similarly to the regular get_next + while True: + dataset = consumer.get_next_dataset(group_id, stream = 'pipelined') + print ('Dataset Id:', dataset['id']) + # the initial response only contains the metadata + # the actual content should be retrieved separately + for metadata in dataset['content']: + data = consumer.retrieve_data(metadata) + print ('Part ' + str(metadata['dataset_substream']) + ' out of ' + str(dataset['expected_size'])) + print (data.tobytes().decode("utf-8"), metadata) + +except asapo_consumer.AsapoStreamFinishedError: + print('stream finished') + +except asapo_consumer.AsapoEndOfStreamError: + print('stream ended') diff --git a/docs/site/examples/python/metadata.py b/docs/site/examples/python/metadata.py new file mode 100644 index 0000000000000000000000000000000000000000..e2860c1653a7bc5d60837789b2e9cb5455b101bf --- /dev/null +++ b/docs/site/examples/python/metadata.py @@ -0,0 +1,114 @@ +import asapo_consumer +import asapo_producer + +import json + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + print("could not send: ",payload,err) + elif err is not None: + print("sent with warning: ",payload,err) + else: + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ" +"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ" +"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ" +"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU") + +path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" + +producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', '', 1, 60000) +producer.set_log_level('error') + +# sample beamtime metadata. You can add any data you want, with any level of complexity +# in this example we use strings and ints, and one nested structure +beamtime_metadata = { + 'name': 'beamtime name', + 'condition': 'beamtime condition', + 'intvalue1': 5, + 'intvalue2': 10, + 'structure': { + 'structint1': 20, + 'structint2': 30 + } +} + +# send the metadata +# by default the new metadata will completely replace the one that's already there +producer.send_beamtime_meta(json.dumps(beamtime_metadata), callback = callback) + +# we can update the existing metadata if we want, by modifying the existing fields, or adding new ones +beamtime_metadata_update = { + 'condition': 'updated beamtime condition', + 'newintvalue': 15 +} + +# send the metadata in the 'update' mode +producer.send_beamtime_meta(json.dumps(beamtime_metadata_update), mode = 'update', callback = callback) + +# sample stream metadata +stream_metadata = { + 'name': 'stream name', + 'condition': 'stream condition', + 'intvalue': 44 +} + +# works the same way: by default we replace the stream metadata, but update is also possible +# update works exactly the same as for beamtime, but here we will only do 'replace' +producer.send_stream_meta(json.dumps(stream_metadata), callback = callback) + +# sample message metadata +message_metadata = { + 'name': 'message name', + 'condition': 'message condition', + 'somevalue': 55 +} + +# the message metadata is sent together with the message itself +# in case of datasets each part has its own metadata +producer.send(1, "processed/test_file", b'hello', user_meta = json.dumps(message_metadata), stream = "default", callback = callback) + +producer.wait_requests_finished(2000) + +consumer = asapo_consumer.create_consumer(endpoint, path_to_files, True, beamtime, "test_source", token, 5000) + +# read the beamtime metadata +beamtime_metadata_read = consumer.get_beamtime_meta() + +# the structure is the same as the one that was sent, and the updated values are already there +print('Name:', beamtime_metadata_read['name']) +print('Condition:', beamtime_metadata_read['condition']) +print('Updated value exists:', 'newintvalue' in beamtime_metadata_read) +print('Sum of int values:', beamtime_metadata_read['intvalue1'] + beamtime_metadata_read['intvalue2']) +print('Nested structure value', beamtime_metadata_read['structure']['structint1']) + +# read the stream metadata +stream_metadata_read = consumer.get_stream_meta(stream = 'default') + +# access various fields from it +print('Stream Name:', stream_metadata_read['name']) +print('Stream Condition:', stream_metadata_read['condition']) +print('Stream int value:', stream_metadata_read['intvalue']) + +group_id = consumer.generate_group_id() +try: + while True: + # right now we are only interested in metadata + data, meta = consumer.get_next(group_id, meta_only = True) + print('Message #', meta['_id']) + + # our custom metadata is stored inside the message metadata + message_metadata_read = meta['meta'] + print('Message Name:', message_metadata_read['name']) + print('Message Condition:', message_metadata_read['condition']) + print('Message int value:', message_metadata_read['somevalue']) +except asapo_consumer.AsapoStreamFinishedError: + print('stream finished') + +except asapo_consumer.AsapoEndOfStreamError: + print('stream ended') diff --git a/docs/site/examples/python/next_stream.py b/docs/site/examples/python/next_stream.py new file mode 100644 index 0000000000000000000000000000000000000000..ff47e8645ded2606f8583f847af997a534471fce --- /dev/null +++ b/docs/site/examples/python/next_stream.py @@ -0,0 +1,68 @@ +import asapo_consumer +import asapo_producer + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + print("could not send: ",payload,err) + elif err is not None: + print("sent with warning: ",payload,err) + else: + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ" +"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ" +"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ" +"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU") + +path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" + +producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', '', 1, 60000) +producer.set_log_level('error') + +# let's start with producing a sample of 10 simple messages +for i in range(1, 11): + producer.send(i, "processed/test_file_" + str(i), ('content of the message #' + str(i)).encode(), stream = 'default', callback = callback) + +# finish the stream and set the next stream to be called 'next' +producer.send_stream_finished_flag('default', i, next_stream = 'next', callback = callback) + +# populate the 'next' stream as well +for i in range(1, 6): + producer.send(i, "processed/test_file_next_" + str(i), ('content of the message #' + str(i)).encode(), stream = 'next', callback = callback) + +# we leave the 'next' stream unfinished, but the chain of streams can be of any length + +producer.wait_requests_finished(2000) + +consumer = asapo_consumer.create_consumer(endpoint, path_to_files, True, beamtime, "test_source", token, 5000) +group_id = consumer.generate_group_id() + +# we start with the 'default' stream (the first one) +stream_name = 'default' + +while True: + try: + data, meta = consumer.get_next(group_id, meta_only = False, stream = stream_name) + text_data = data.tobytes().decode("utf-8") + message_id = meta['_id'] + print('Message #', message_id, ':', text_data) + except asapo_consumer.AsapoStreamFinishedError: + # when the stream finishes, we look for the info on the next stream + # first, we find the stream with our name in the list of streams + stream = next(s for s in consumer.get_stream_list() if s['name'] == stream_name) + # then we look if the field 'nextStream' is set and not empty + if 'nextStream' in stream and stream['nextStream']: + # if it's not, we continue with the next stream + stream_name = stream['nextStream'] + print('Changing stream to the next one:', stream_name) + continue + # otherwise we stop + print('stream finished') + break + except asapo_consumer.AsapoEndOfStreamError: + print('stream ended') + break diff --git a/docs/site/examples/python/pipeline.py b/docs/site/examples/python/pipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..71c782032f2df69e5e64d514e8afadfb8ebcf05c --- /dev/null +++ b/docs/site/examples/python/pipeline.py @@ -0,0 +1,60 @@ +import asapo_consumer +import asapo_producer + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + print("could not send: ",payload,err) + elif err is not None: + print("sent with warning: ",payload,err) + else: + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ" +"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ" +"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ" +"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU") + +path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" + +consumer = asapo_consumer.create_consumer(endpoint, path_to_files, True, beamtime, "test_source", token, 5000) + +producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', '', 1, 60000) + +group_id = consumer.generate_group_id() + +# put the processed message into the new stream +pipelined_stream_name = 'pipelined' + +try: + while True: + # we expect the message to be in the 'default' stream already + data, meta = consumer.get_next(group_id, meta_only = False) + message_id = meta['_id'] + + # work on our data + text_data = data.tobytes().decode("utf-8") + pipelined_message = (text_data + ' processed').encode() + + # you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase + producer.send(message_id, "processed/test_file_" + message_id, pipelined_message, pipelined_stream_name, callback = callback) + + +except asapo_consumer.AsapoStreamFinishedError: + print('stream finished') + +except asapo_consumer.AsapoEndOfStreamError: + print('stream ended') + +producer.wait_requests_finished(2000) + +# the meta from the last iteration corresponds to the last message +last_id = meta['_id'] + +producer.send_stream_finished_flag("pipelined", last_id) + +# you can remove the source stream if you do not need it anymore +consumer.delete_stream(stream = 'default', error_on_not_exist = True) diff --git a/docs/site/examples/python/produce.py b/docs/site/examples/python/produce.py new file mode 100644 index 0000000000000000000000000000000000000000..4d03ccb9a0009ebed824396037e521d91686b9f3 --- /dev/null +++ b/docs/site/examples/python/produce.py @@ -0,0 +1,44 @@ +import asapo_producer + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + # the data was not sent. Something is terribly wrong. + print("could not send: ",payload,err) + elif err is not None: + # The data was sent, but there was some unexpected problem, e.g. the file was overwritten. + print("sent with warning: ",payload,err) + else: + # all fine + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +producer = asapo_producer \ + .create_producer(endpoint, + 'processed', # should be 'processed' or 'raw', 'processed' writes to the core FS + beamtime, # the folder should exist + 'auto', # can be 'auto', if beamtime_id is given + 'test_source', # source + '', # athorization token + 1, # number of threads. Increase, if the sending speed seems slow + 60000) # timeout. Do not change. + +producer.set_log_level("error") # other values are "warning", "info" or "debug". + +# we are sending a message with with index 1 to the default stream. Filename must start with processed/ +producer.send(1, # message number. Should be unique and ordered. + "processed/test_file", # name of the file. Should be unique, or it will be overwritten + b"hello", # binary data + callback = callback) # callback + +# send data in loop + +# add the following at the end of the script + +producer.wait_requests_finished(2000) # will synchronously wait for all the data to be sent. + # Use it when no more data is expected. + +# you may want to mark the stream as finished +producer.send_stream_finished_flag("default", # name of the stream. If you didn't specify the stream in 'send', it would be 'default' + 1) # the number of the last message in the stream diff --git a/docs/site/examples/python/produce_dataset.py b/docs/site/examples/python/produce_dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..ffaae34e1a117762ab161fd949f6a42f50bcfda7 --- /dev/null +++ b/docs/site/examples/python/produce_dataset.py @@ -0,0 +1,27 @@ +import asapo_producer + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + print("could not send: ",payload,err) + elif err is not None: + print("sent with warning: ",payload,err) + else: + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', '', 1, 60000) + +#assuming we have three different producers for a single dataset + +# add the additional 'dataset' paremeter, which should be (<part_number>, <total_parts_in_dataset>) +producer.send(1, "processed/test_file_dataset_1", b"hello dataset 1", dataset = (1,3), callback = callback) +# this can be done from different producers in any order +producer.send(1, "processed/test_file_dataset_1", b"hello dataset 2", dataset = (2,3), callback = callback) +producer.send(1, "processed/test_file_dataset_1", b"hello dataset 3", dataset = (3,3), callback = callback) + +producer.wait_requests_finished(2000) +# the dataset parts are not counted towards the number of messages in the stream +# the last message id in this example is still 1 +producer.send_stream_finished_flag("default", 1) diff --git a/docs/site/examples/python/query.py b/docs/site/examples/python/query.py new file mode 100644 index 0000000000000000000000000000000000000000..d3d61f7471f6af09cb4c3b0406f7d8851e6d8ec1 --- /dev/null +++ b/docs/site/examples/python/query.py @@ -0,0 +1,76 @@ +import asapo_consumer +import asapo_producer + +import json +from datetime import datetime, timedelta + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + print("could not send: ",payload,err) + elif err is not None: + print("sent with warning: ",payload,err) + else: + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e" +"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ" +"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ" +"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ" +"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU") + +path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test" + +producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', '', 1, 60000) +producer.set_log_level('error') + +# let's start with producing some messages with metadata +for i in range(1, 11): + metadata = { + 'condition': 'condition #' + str(i), + 'somevalue': i * 10 + } + producer.send(i, "processed/test_file_" + str(i), ('message #' + str(i)).encode(), user_meta = json.dumps(metadata), stream = "default", callback = callback) + +producer.wait_requests_finished(2000) + +consumer = asapo_consumer.create_consumer(endpoint, path_to_files, True, beamtime, "test_source", token, 5000) + +# helper function to print messages +def print_messages(metadatas): + # the query will return the list of metadatas + for meta in metadatas: + # for each metadata we need to obtain the actual message first + data = consumer.retrieve_data(meta) + print('Message #', meta['_id'], ', content:', data.tobytes().decode("utf-8"), ', usermetadata:', meta['meta']) + +# simple query, same as get_by_id +metadatas = consumer.query_messages('_id = 1') +print('Message with ID = 1') +print_messages(metadatas) + +# the query that requests the range of IDs +metadatas = consumer.query_messages('_id >= 8') +print('Messages with ID >= 8') +print_messages(metadatas) + +# the query that has some specific requirement for message metadata +metadatas = consumer.query_messages('meta.condition = "condition #7"') +print('Message with condition = "condition #7"') +print_messages(metadatas) + +# the query that has several requirements for user metadata +metadatas = consumer.query_messages('meta.somevalue > 30 AND meta.somevalue < 60') +print('Message with 30 < somevalue < 60') +print_messages(metadatas) + +# the query that is based on the message's timestamp +now = datetime.now() +fifteen_minutes_ago = now - timedelta(minutes = 15) +# python uses timestamp in seconds, while ASAP::O in nanoseconds, so we need to multiply it by a billion +metadatas = consumer.query_messages('timestamp < {} AND timestamp > {}'.format(now.timestamp() * 10**9, fifteen_minutes_ago.timestamp() * 10**9)) +print('Messages in the last 15 minutes') +print_messages(metadatas) + diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/start_asapo_socket.sh b/docs/site/examples/start_asapo_socket.sh similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/start_asapo_socket.sh rename to docs/site/examples/start_asapo_socket.sh diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/start_asapo_tcp.sh b/docs/site/examples/start_asapo_tcp.sh similarity index 97% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/start_asapo_tcp.sh rename to docs/site/examples/start_asapo_tcp.sh index 83439b18dc1e5b26d30fc43e4fcaaa4fd0e6d198..41fd0327cfcd9b1d9acd8ff807f33a8c07fcc66c 100644 --- a/docs/site/examples/frozen_versions/21.06.0/getting_started/start_asapo_tcp.sh +++ b/docs/site/examples/start_asapo_tcp.sh @@ -21,7 +21,7 @@ ASAPO_USER=`id -u`:`id -g` mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE chmod 777 $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE -cd $SERVICE_DATA_CLUSTER_SHARED +cd $SERVICE_DATA_CLUSTER_SHAREDdetector mkdir -p fluentd grafana influxdb2 mongodb chmod 777 * diff --git a/docs/site/freeze_version.sh b/docs/site/freeze_version.sh new file mode 100755 index 0000000000000000000000000000000000000000..3bc04a16443ce592cad458b9dce704fcbfc93e6c --- /dev/null +++ b/docs/site/freeze_version.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +if [[ -z "${DOCS_VERSION}" ]]; then + echo No version specified + + exit 1 +fi + +echo Freezing version $DOCS_VERSION + +#npm run docusaurus docs:version $DOCS_VERSION + +VERSIONED_EXAMPLES="versioned_examples/version-$DOCS_VERSION" +VERSIONED_EXAMPLES_ESCAPED="versioned_examples\\/version-$DOCS_VERSION" + +#mkdir $VERSIONED_EXAMPLES + +#cp -r examples/* $VERSIONED_EXAMPLES + +CONTENT='content=\"\.\/' + +#replace the links to the code examples to the frozen copies +for file in $(find ./versioned_docs/version-$DOCS_VERSION -type f) +do +ed -s $file <<ED_COMMANDS > /dev/null 2>&1 +,s/content=\"\?\.\/examples/content=\".\/${VERSIONED_EXAMPLES_ESCAPED}/g +w +ED_COMMANDS +done + +#replace the links to the dev-packages to the versioned ones +for file in $(find ./${VERSIONED_EXAMPLES} -type f) +do +ed -s $file <<ED_COMMANDS > /dev/null 2>&1 +,s/100\.0[~.]develop/${DOCS_VERSION}/g +w +ED_COMMANDS +done + +exit 0 diff --git a/docs/site/sidebars.js b/docs/site/sidebars.js index a1ff08e60ca70bd76e6fa83d3cfd9fba75859783..f58886388e503bf74323f50c0e1e7bf8eca4e4ad 100644 --- a/docs/site/sidebars.js +++ b/docs/site/sidebars.js @@ -13,12 +13,27 @@ module.exports = { 'core-architecture', ], }, - { + { type: 'category', label: 'Use Cases', items: [ 'p02.1', ], }, + { + type: 'category', + label: 'Code Examples', + items: [ + 'cookbook/overview', + 'cookbook/simple-producer', + 'cookbook/simple-consumer', + 'cookbook/simple-pipeline', + 'cookbook/datasets', + 'cookbook/acknowledgements', + 'cookbook/metadata', + 'cookbook/next_stream', + 'cookbook/query' + ] + } ], }; diff --git a/docs/site/src/theme/CodeBlock.tsx b/docs/site/src/theme/CodeBlock.tsx index d806f1954181aac5af3f7abc6ab0d43b0ab30658..f6046c51d9f1ae51e3e6e5bd0c9fee7548769de5 100644 --- a/docs/site/src/theme/CodeBlock.tsx +++ b/docs/site/src/theme/CodeBlock.tsx @@ -1,10 +1,7 @@ import React from 'react' import InitCodeBlock from '@theme-init/CodeBlock' -import useDocusaurusContext from '@docusaurus/useDocusaurusContext'; - - -const requireContext = require.context('../../examples/', true, /(\.sh|\.py|\.cpp|\.c|\.txt|Makefile)$/); +const requireContext = require.context('../../', true, /(\.sh|\.py|\.cpp|\.c|\.txt|Makefile)$/); const noteStyle: React.CSSProperties = { textAlign: 'right', @@ -16,33 +13,25 @@ export interface State { isCancelled: boolean } -function getVal(name: string, props: any) { - const codeRegex = new RegExp("(?:" + name + "=\")(.*?)(\")") - - let val = undefined - if (props.metastring && codeRegex.test(props.metastring)) { - val = props.metastring.match(codeRegex)[1]; - } - return val; -} - function ReferenceCode(props: any) { - const codeBlockContent = getVal("content", props) + let codeBlockContent = props.content - if (!codeBlockContent) { + if (codeBlockContent == undefined) { return ( <InitCodeBlock {...props}/> ); } - const {siteConfig} = useDocusaurusContext(); - const version = siteConfig.customFields.version; - console.log(siteConfig); - const urlLink = "https://stash.desy.de/projects/ASAPO/repos/asapo/browse/examples/for_site/" + codeBlockContent + "?at=" + version + codeBlockContent = codeBlockContent.replace(/"/g,'') + + const urlLink = "https://stash.desy.de/projects/ASAPO/repos/asapo/browse/docs/site/" + codeBlockContent - const snippetTag = getVal("snippetTag", props) + let snippetTag = props.snippetTag + if (snippetTag !== undefined) { + snippetTag = snippetTag.replace(/"/g,'') + } + if (codeBlockContent) { - const c = codeBlockContent.replace('@ASAPO_EXAMPLES_DIR@', '.') - const res = requireContext(c) + const res = requireContext(codeBlockContent) let body = res.default.split('\n') const fromLine = body.indexOf(snippetTag + " snippet_start") + 1; const toLine = body.indexOf(snippetTag + " snippet_end", fromLine) - 1; @@ -76,4 +65,4 @@ export default function CodeBlock(props) { return ( <ReferenceCode {...props} /> ); -} \ No newline at end of file +} diff --git a/docs/site/versioned_docs/version-21.06.0/getting-started.mdx b/docs/site/versioned_docs/version-21.06.0/getting-started.mdx index 2ba2462b4040589357a2d0f0a2cf470677d0f559..5460625146ecf712d252ee6685dcb0c4a0c525d5 100644 --- a/docs/site/versioned_docs/version-21.06.0/getting-started.mdx +++ b/docs/site/versioned_docs/version-21.06.0/getting-started.mdx @@ -32,14 +32,14 @@ unix socket or a tcp port for communications) }> <TabItem value="unix"> -```shell content="./frozen_versions/21.06.0/getting_started/start_asapo_socket.sh" +```shell content="./versioned_examples/version-21.06.0/start_asapo_socket.sh" ``` </TabItem> <TabItem value="tcp"> -```shell content="./frozen_versions/21.06.0/getting_started/start_asapo_tcp.sh" +```shell content="./versioned_examples/version-21.06.0/start_asapo_tcp.sh" ``` </TabItem> @@ -84,19 +84,19 @@ Now you can install Python packages or C++ libraries for ASAPO Producer and Cons }> <TabItem value="python-pip"> -```shell content="./frozen_versions/21.06.0/getting_started/install_python_clients_pip.sh" snippetTag="#snippet1" +```shell content="./versioned_examples/version-21.06.0/install_python_clients_pip.sh" snippetTag="#snippet1" ``` </TabItem> <TabItem value="python-packages"> -```shell content="./frozen_versions/21.06.0/getting_started/install_python_clients_pkg.sh" +```shell content="./versioned_examples/version-21.06.0/install_python_clients_pkg.sh" ``` </TabItem> <TabItem value="cpp"> -```shell content="./frozen_versions/21.06.0/getting_started/install_cpp_clients.sh" +```shell content="./versioned_examples/version-21.06.0/install_cpp_clients.sh" ``` </TabItem> @@ -116,7 +116,7 @@ Now you can install Python packages or C++ libraries for ASAPO Producer and Cons Now you can write a Producer client (API documentation [here](http://asapo.desy.de/python/producer.html)). -```shell content="./frozen_versions/21.06.0/getting_started/python/produce.py" +```shell content="./versioned_examples/version-21.06.0/python/produce.py" ``` Execute it with python3 @@ -131,12 +131,12 @@ $ python3 produce.py Now you can write a Producer client (API documentation [here](http://asapo.desy.de/cpp/producer)). -```shell content="./frozen_versions/21.06.0/getting_started/cpp/produce.cpp" +```shell content="./versioned_examples/version-21.06.0/cpp/produce.cpp" ``` Compile e.g. using CMake and execute. You might need to point cmake (with CMAKE_PREFIX_PATH) to asapo installation and curl library if installed to non-standard location. -```shell content="./frozen_versions/21.06.0/getting_started/cpp/CMakeLists.txt" snippetTag="#producer" +```shell content="./versioned_examples/version-21.06.0/cpp/CMakeLists.txt" snippetTag="#producer" ``` ``` @@ -173,7 +173,7 @@ A consumer data that reads the message ingested during step 3. Note that a token Complete API documentation [here](http://asapo.desy.de/python/consumer.html) -```shell content="./frozen_versions/21.06.0/getting_started/python/consume.py" +```shell content="./versioned_examples/version-21.06.0/python/consume.py" ``` Execute it with python3 @@ -186,12 +186,12 @@ $ python3 consumer.py <TabItem value="cpp"> -```shell content="./frozen_versions/21.06.0/getting_started/cpp/consume.cpp" +```shell content="./versioned_examples/version-21.06.0/cpp/consume.cpp" ``` Compile e.g. using CMake and execute. You might need to point cmake (with CMAKE_PREFIX_PATH) to asapo installation and curl library if installed to non-standard location. -```shell content="./frozen_versions/21.06.0/getting_started/cpp/CMakeLists.txt" snippetTag="#consumer" +```shell content="./versioned_examples/version-21.06.0/cpp/CMakeLists.txt" snippetTag="#consumer" ``` ``` @@ -203,12 +203,12 @@ $ ./asapo-consume <TabItem value="c"> -```shell content="./frozen_versions/21.06.0/getting_started/c/consume.c" +```shell content="./versioned_examples/version-21.06.0/c/consume.c" ``` Compile e.g. using Makefile and pkg-config (although we recommend CMake - see C++ section) and execute. This example assumes asapo is installed to /opt/asapo. Adjust correspondingly. -```shell content="./frozen_versions/21.06.0/getting_started/c/Makefile" snippetTag="#consumer" +```shell content="./versioned_examples/version-21.06.0/c/Makefile" snippetTag="#consumer" ``` ``` @@ -234,7 +234,7 @@ stream deleted Optionally, last step is to stop ASAPO services and remove files: -```shell content="./frozen_versions/21.06.0/getting_started/cleanup.sh" +```shell content="./versioned_examples/version-21.06.0/cleanup.sh" ``` <br/><br/> diff --git a/docs/site/versioned_examples/.gitignore b/docs/site/versioned_examples/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..3d758890d349a004d946898e11906fe95d43b2d8 --- /dev/null +++ b/docs/site/versioned_examples/.gitignore @@ -0,0 +1 @@ +!Makefile \ No newline at end of file diff --git a/docs/site/examples/getting_started/c/consume.c b/docs/site/versioned_examples/version-21.06.0/c/consume.c similarity index 100% rename from docs/site/examples/getting_started/c/consume.c rename to docs/site/versioned_examples/version-21.06.0/c/consume.c diff --git a/docs/site/examples/getting_started/cleanup.sh b/docs/site/versioned_examples/version-21.06.0/cleanup.sh similarity index 100% rename from docs/site/examples/getting_started/cleanup.sh rename to docs/site/versioned_examples/version-21.06.0/cleanup.sh diff --git a/docs/site/examples/getting_started/cpp/CMakeLists.txt b/docs/site/versioned_examples/version-21.06.0/cpp/CMakeLists.txt similarity index 100% rename from docs/site/examples/getting_started/cpp/CMakeLists.txt rename to docs/site/versioned_examples/version-21.06.0/cpp/CMakeLists.txt diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/consume.cpp b/docs/site/versioned_examples/version-21.06.0/cpp/consume.cpp similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/consume.cpp rename to docs/site/versioned_examples/version-21.06.0/cpp/consume.cpp diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/produce.cpp b/docs/site/versioned_examples/version-21.06.0/cpp/produce.cpp similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/produce.cpp rename to docs/site/versioned_examples/version-21.06.0/cpp/produce.cpp diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/install_cpp_clients.sh b/docs/site/versioned_examples/version-21.06.0/install_cpp_clients.sh similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/install_cpp_clients.sh rename to docs/site/versioned_examples/version-21.06.0/install_cpp_clients.sh diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/install_python_clients_pip.sh b/docs/site/versioned_examples/version-21.06.0/install_python_clients_pip.sh similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/install_python_clients_pip.sh rename to docs/site/versioned_examples/version-21.06.0/install_python_clients_pip.sh diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/install_python_clients_pkg.sh b/docs/site/versioned_examples/version-21.06.0/install_python_clients_pkg.sh similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/install_python_clients_pkg.sh rename to docs/site/versioned_examples/version-21.06.0/install_python_clients_pkg.sh diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/python/consume.py b/docs/site/versioned_examples/version-21.06.0/python/consume.py similarity index 100% rename from docs/site/examples/frozen_versions/21.06.0/getting_started/python/consume.py rename to docs/site/versioned_examples/version-21.06.0/python/consume.py diff --git a/docs/site/versioned_examples/version-21.06.0/python/produce.py b/docs/site/versioned_examples/version-21.06.0/python/produce.py new file mode 100644 index 0000000000000000000000000000000000000000..0742fe3afa29805ada23b983cf97bca7aaf418c2 --- /dev/null +++ b/docs/site/versioned_examples/version-21.06.0/python/produce.py @@ -0,0 +1,44 @@ +import asapo_producer + +def callback(payload,err): + if err is not None and "server warning" not in err: + # the data was not sent. Something is terribly wrong. + print("could not send: ",payload,err) + elif err is not None: + # The data was sent, but there was some unexpected problem, e.g. the file was overwritten. + print("sent with warning":,payload,err) + else: + # all fine + print("successfuly sent: ",payload) + +endpoint = "localhost:8400" +beamtime = "asapo_test" + +producer = asapo_producer + .create_producer(endpoint, + 'processed', # should be 'processed' or 'raw', 'processed' writes to the core FS + beamtime, # the folder should exist + 'auto', # can be 'auto', if beamtime_id is given + 'test_source', # source + '', # athorization token + 1, # number of threads. Increase, if the sending speed seems slow + 60000) # timeout. Do not change. + +producer.set_log_level("error") # other values are "warning", "info" or "debug". + +# we are sending a message with with index 1 to the default stream. Filename must start with processed/ +producer.send(1, # message number. Should be unique and ordered. + "processed/test_file", # name of the file. Should be unique, or it will be overwritten + b"hello", # binary data + callback = callback) # callback + +# send data in loop + +# add the following at the end of the script + +producer.wait_requests_finished(2000) # will synchronously wait for all the data to be sent. + # Use it when no more data is expected. + +# you may want to mark the stream as finished +producer.send_stream_finished_flag("default", # name of the stream. If you didn't specify the stream in 'send', it would be 'default' + 1) # the number of the last message in the stream diff --git a/docs/site/examples/getting_started/start_asapo_socket.sh b/docs/site/versioned_examples/version-21.06.0/start_asapo_socket.sh old mode 100644 new mode 100755 similarity index 94% rename from docs/site/examples/getting_started/start_asapo_socket.sh rename to docs/site/versioned_examples/version-21.06.0/start_asapo_socket.sh index d8f14743f8b732213ce6973b3348cc3e10d3d88e..9d899f035c998daa6af5033c2b59fc1c66b18a84 --- a/docs/site/examples/getting_started/start_asapo_socket.sh +++ b/docs/site/versioned_examples/version-21.06.0/start_asapo_socket.sh @@ -32,7 +32,7 @@ docker run --privileged --rm -v /var/run/docker.sock:/var/run/docker.sock \ -e TF_VAR_mongo_dir=$MONGO_DIR \ -e TF_VAR_asapo_user=$ASAPO_USER \ -e ACL_ENABLED=true \ - --name asapo --net=host -d yakser/asapo-cluster:@ASAPO_VERSION_IN_DOCS@ + --name asapo --net=host -d yakser/asapo-cluster:21.06.0 sleep 15 -docker exec asapo jobs-start -var elk_logs=false -var influxdb_version=1.8.4 \ No newline at end of file +docker exec asapo jobs-start -var elk_logs=false -var influxdb_version=1.8.4 diff --git a/docs/site/examples/getting_started/start_asapo_tcp.sh b/docs/site/versioned_examples/version-21.06.0/start_asapo_tcp.sh similarity index 94% rename from docs/site/examples/getting_started/start_asapo_tcp.sh rename to docs/site/versioned_examples/version-21.06.0/start_asapo_tcp.sh index 5dd30bd2512f2d518dab08b260414154a808ed94..41fd0327cfcd9b1d9acd8ff807f33a8c07fcc66c 100644 --- a/docs/site/examples/getting_started/start_asapo_tcp.sh +++ b/docs/site/versioned_examples/version-21.06.0/start_asapo_tcp.sh @@ -21,7 +21,7 @@ ASAPO_USER=`id -u`:`id -g` mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE chmod 777 $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE -cd $SERVICE_DATA_CLUSTER_SHARED +cd $SERVICE_DATA_CLUSTER_SHAREDdetector mkdir -p fluentd grafana influxdb2 mongodb chmod 777 * @@ -41,7 +41,7 @@ docker run --privileged --userns=host --security-opt no-new-privileges --rm \ -v $DOCKER_TLS_KEY:/etc/nomad/key.pem \ -v $DOCKER_TLS_CERT:/etc/nomad/cert.pem \ -e DOCKER_ENDPOINT=$DOCKER_ENDPOINT \ - --name asapo --net=host -d yakser/asapo-cluster:@ASAPO_VERSION_IN_DOCS@ + --name asapo --net=host -d yakser/asapo-cluster:21.06.0 sleep 15 docker exec asapo jobs-start -var elk_logs=false