Skip to content
Snippets Groups Projects
Commit 75c63b1c authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge pull request #192 in ASAPO/asapo from docs to develop

* commit '0bdea405':
  update docs for 22.03.0
parents a7514198 0bdea405
No related branches found
No related tags found
No related merge requests found
import asapo_consumer
import asapo_producer
import json
from datetime import datetime, timedelta
def callback(payload,err):
if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning):
print("could not send: ",payload,err)
elif err is not None:
print("sent with warning: ",payload,err)
else:
print("successfuly sent: ",payload)
endpoint = "localhost:8400"
beamtime = "asapo_test"
token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e"
"yJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZ"
"wOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ"
"2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJ"
"dfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU")
path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"
producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', 'test_source', '', 1, 60000)
producer.set_log_level('error')
# let's start with producing some messages with metadata
for i in range(1, 11):
metadata = {
'condition': 'condition #' + str(i),
'somevalue': i * 10
}
producer.send(i, "processed/test_file_" + str(i), ('message #' + str(i)).encode(), user_meta = json.dumps(metadata), stream = "default", callback = callback)
producer.wait_requests_finished(2000)
consumer = asapo_consumer.create_consumer(endpoint, path_to_files, True, beamtime, "test_source", token, 5000)
# helper function to print messages
def print_messages(metadatas):
# the query will return the list of metadatas
for meta in metadatas:
# for each metadata we need to obtain the actual message first
data = consumer.retrieve_data(meta)
print('Message #', meta['_id'], ', content:', data.tobytes().decode("utf-8"), ', usermetadata:', meta['meta'])
# by_id snippet_start
# simple query, same as get_by_id
metadatas = consumer.query_messages('_id = 1')
# by_id snippet_end
print('Message with ID = 1')
print_messages(metadatas)
# by_ids snippet_start
# the query that requests the range of IDs
metadatas = consumer.query_messages('_id >= 8')
# by_ids snippet_end
print('Messages with ID >= 8')
print_messages(metadatas)
# string_equal snippet_start
# the query that has some specific requirement for message metadata
metadatas = consumer.query_messages('meta.condition = "condition #7"')
# string_equal snippet_end
print('Message with condition = "condition #7"')
print_messages(metadatas)
# int_compare snippet_start
# the query that has several requirements for user metadata
metadatas = consumer.query_messages('meta.somevalue > 30 AND meta.somevalue < 60')
# int_compare snippet_end
print('Message with 30 < somevalue < 60')
print_messages(metadatas)
# timestamp snippet_start
# the query that is based on the message's timestamp
now = datetime.now()
fifteen_minutes_ago = now - timedelta(minutes = 15)
# python uses timestamp in seconds, while ASAP::O in nanoseconds, so we need to multiply it by a billion
metadatas = consumer.query_messages('timestamp < {} AND timestamp > {}'.format(now.timestamp() * 10**9, fifteen_minutes_ago.timestamp() * 10**9))
# timestamp snippet_end
print('Messages in the last 15 minutes')
print_messages(metadatas)
#!/usr/bin/env bash
set -e
ASAPO_HOST_DIR=/var/tmp/asapo # you can change this if needed, make sure there is enough space ( >3GB on disk)
NOMAD_ALLOC_HOST_SHARED=$ASAPO_HOST_DIR/container_host_shared/nomad_alloc
SERVICE_DATA_CLUSTER_SHARED=$ASAPO_HOST_DIR/asapo_cluster_shared/service_data
DATA_GLOBAL_SHARED=$ASAPO_HOST_DIR/global_shared/data
DATA_GLOBAL_SHARED_ONLINE=$ASAPO_HOST_DIR/global_shared/online_data
MONGO_DIR=$SERVICE_DATA_CLUSTER_SHARED/mongodb
ASAPO_USER=`id -u`:`id -g`
mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE
chmod 777 $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE
cd $SERVICE_DATA_CLUSTER_SHARED
mkdir -p fluentd grafana influxdb influxdb2 mongodb prometheus alertmanager
chmod 777 *
docker run --privileged --rm -v /var/run/docker.sock:/var/run/docker.sock \
-u $ASAPO_USER \
--group-add `getent group docker | cut -d: -f3` \
-v $NOMAD_ALLOC_HOST_SHARED:$NOMAD_ALLOC_HOST_SHARED \
-v $SERVICE_DATA_CLUSTER_SHARED:$SERVICE_DATA_CLUSTER_SHARED \
-v $DATA_GLOBAL_SHARED:$DATA_GLOBAL_SHARED \
-e NOMAD_ALLOC_DIR=$NOMAD_ALLOC_HOST_SHARED \
-e TF_VAR_service_dir=$SERVICE_DATA_CLUSTER_SHARED \
-e TF_VAR_online_dir=$DATA_GLOBAL_SHARED_ONLINE \
-e TF_VAR_offline_dir=$DATA_GLOBAL_SHARED \
-e TF_VAR_mongo_dir=$MONGO_DIR \
-e TF_VAR_asapo_user=$ASAPO_USER \
-e ACL_ENABLED=true \
--name asapo --net=host -d yakser/asapo-cluster:22.03.0
sleep 15
docker exec asapo jobs-start
#!/usr/bin/env bash
set -e
ASAPO_HOST_DIR=/var/tmp/asapo # you can change this if needed, make sure there is enough space ( >3GB on disk)
# change this according to your Docker configuration
DOCKER_ENDPOINT="127.0.0.1:2376"
DOCKER_TLS_CA=/usr/local/docker/certs/$USER/ca.pem
DOCKER_TLS_KEY=/usr/local/docker/certs/$USER/key.pem
DOCKER_TLS_CERT=/usr/local/docker/certs/$USER/cert.pem
NOMAD_ALLOC_HOST_SHARED=$ASAPO_HOST_DIR/container_host_shared/nomad_alloc
SERVICE_DATA_CLUSTER_SHARED=$ASAPO_HOST_DIR/asapo_cluster_shared/service_data
DATA_GLOBAL_SHARED=$ASAPO_HOST_DIR/global_shared/data
DATA_GLOBAL_SHARED_ONLINE=$ASAPO_HOST_DIR/global_shared/online_data
MONGO_DIR=$SERVICE_DATA_CLUSTER_SHARED/mongodb
ASAPO_USER=`id -u`:`id -g`
mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE
chmod 777 $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $DATA_GLOBAL_SHARED_ONLINE
cd $SERVICE_DATA_CLUSTER_SHAREDdetector
mkdir -p fluentd grafana influxdb influxdb2 mongodb prometheus alertmanager
chmod 777 *
docker run --privileged --userns=host --security-opt no-new-privileges --rm \
-u $ASAPO_USER \
-v $NOMAD_ALLOC_HOST_SHARED:$NOMAD_ALLOC_HOST_SHARED \
-v $SERVICE_DATA_CLUSTER_SHARED:$SERVICE_DATA_CLUSTER_SHARED \
-v $DATA_GLOBAL_SHARED:$DATA_GLOBAL_SHARED \
-e NOMAD_ALLOC_DIR=$NOMAD_ALLOC_HOST_SHARED \
-e TF_VAR_service_dir=$SERVICE_DATA_CLUSTER_SHARED \
-e TF_VAR_online_dir=$DATA_GLOBAL_SHARED_ONLINE \
-e TF_VAR_offline_dir=$DATA_GLOBAL_SHARED \
-e TF_VAR_mongo_dir=$MONGO_DIR \
-e TF_VAR_asapo_user=$ASAPO_USER \
-e ACL_ENABLED=true \
-v $DOCKER_TLS_CA:/etc/nomad/ca.pem \
-v $DOCKER_TLS_KEY:/etc/nomad/key.pem \
-v $DOCKER_TLS_CERT:/etc/nomad/cert.pem \
-e DOCKER_ENDPOINT=$DOCKER_ENDPOINT \
--name asapo --net=host -d yakser/asapo-cluster:22.03.0
sleep 15
docker exec asapo jobs-start
{
"version-22.03.0/docs": [
{
"type": "doc",
"id": "version-22.03.0/getting-started"
},
{
"type": "doc",
"id": "version-22.03.0/overview"
},
{
"type": "doc",
"id": "version-22.03.0/compare-to-others"
},
{
"collapsed": true,
"type": "category",
"label": "Concepts And Architecture",
"items": [
{
"type": "doc",
"id": "version-22.03.0/data-in-asapo"
},
{
"type": "doc",
"id": "version-22.03.0/producer-clients"
},
{
"type": "doc",
"id": "version-22.03.0/consumer-clients"
},
{
"type": "doc",
"id": "version-22.03.0/core-architecture"
}
]
},
{
"collapsed": true,
"type": "category",
"label": "Use Cases",
"items": [
{
"type": "doc",
"id": "version-22.03.0/p02.1"
}
]
},
{
"collapsed": true,
"type": "category",
"label": "Code Examples",
"items": [
{
"type": "doc",
"id": "version-22.03.0/cookbook/overview"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/simple-producer"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/simple-consumer"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/simple-pipeline"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/datasets"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/acknowledgements"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/metadata"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/next_stream"
},
{
"type": "doc",
"id": "version-22.03.0/cookbook/query"
}
]
}
]
}
[
"22.03.0",
"21.12.0",
"21.09.0",
"21.06.0"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment