Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
FS-SC
AsapoWorker
Commits
cb06c034
Commit
cb06c034
authored
May 17, 2021
by
Mikhail Karnevskiy
Browse files
Feat/acknowledge slim
parent
bdfe112c
Changes
4
Hide whitespace changes
Inline
Side-by-side
src/AsapoWorker/application.py
View file @
cb06c034
import
argparse
import
binascii
import
logging
import
signal
import
sys
from
AsapoWorker.configuration
import
(
create_instance_from_configurable
,
create_cli_parser_from_configurable_class
,
...
...
@@ -75,7 +77,7 @@ class Application:
help
=
"Scheme to chose new stream name"
)
parser
.
add_argument
(
"--stream"
,
type
=
str
,
default
=
"
default
"
,
metavar
=
"STR"
,
"--stream"
,
type
=
str
,
default
=
""
,
metavar
=
"STR"
,
help
=
"The stream where streaming should start"
)
create_cli_parser_from_configurable_class
(
...
...
@@ -139,6 +141,7 @@ class Application:
if
not
self
.
sender_consumer
:
self
.
sender_consumer
=
sender
.
consumer
self
.
sender_producer
=
sender
.
producer
sender
.
_acknowledge_received
=
consumer
.
acknowledge
else
:
sender
=
None
...
...
@@ -149,17 +152,14 @@ class Application:
self
.
options
[
"metadata_receiver"
])
if
not
stream
:
if
"stream"
in
self
.
options
:
stream
=
self
.
options
[
"stream"
]
if
stream
:
log
.
info
(
"Setting stream=%s"
,
stream
)
consumer
.
stream
=
stream
if
sender
:
sender
.
stream
=
stream
stream
=
self
.
get_starting_stream
(
consumer
)
_set_start_position
(
self
.
options
,
consumer
,
sender
,
self
.
worker_class
)
log
.
info
(
"Setting stream=%s"
,
stream
)
consumer
.
stream
=
stream
if
sender
:
sender
.
stream
=
stream
worker
=
create_instance_from_configurable
(
self
.
worker_class
,
self
.
options
[
"worker"
])
...
...
@@ -170,6 +170,7 @@ class Application:
# id will be ignored and the start id will be calculated from the
# acknowledged ids
_unset_start_position
(
self
.
options
)
worker
.
_acknowledge
=
consumer
.
acknowledge
if
sender
:
worker
.
sender
=
sender
...
...
@@ -191,6 +192,24 @@ class Application:
return
streamer
def
get_starting_stream
(
self
,
consumer
):
log
.
info
(
"Get starting receiver stream"
)
if
self
.
options
[
"stream"
]
!=
''
:
return
self
.
options
[
"stream"
]
try
:
sender_data_source
=
self
.
options
[
"sender"
][
"data_source"
]
group_id
=
str
(
binascii
.
crc32
(
sender_data_source
.
encode
()))
streams
=
consumer
.
get_stream_list
()
for
stream_info
in
streams
:
msgs
=
consumer
.
get_unacknowledged_messages
(
group_id
,
stream
=
stream_info
[
"name"
])
if
msgs
:
log
.
info
(
"Earliest non-processed stream=%s, number of messages=%s"
,
stream_info
[
"name"
],
len
(
msgs
))
return
stream_info
[
"name"
]
except
Exception
as
e
:
log
.
error
(
"Getting non-processed receiver stream fails messages=%s"
,
e
)
return
'default'
def
run
(
self
):
if
not
self
.
initialized
:
self
.
initialize
()
...
...
src/AsapoWorker/asapo_receiver.py
View file @
cb06c034
import
logging
import
binascii
import
asapo_consumer
from
AsapoWorker.configurable
import
Configurable
,
Config
from
AsapoWorker.errors
import
(
...
...
@@ -117,6 +118,29 @@ class SimpleAsapoReceiver:
except
asapo_consumer
.
AsapoConsumerError
as
err
:
raise
StreamError
(
"Failed to get stream info"
)
from
err
def
acknowledge
(
self
,
name
,
ids
):
for
ack_id
in
ids
:
try
:
group_id
=
str
(
binascii
.
crc32
(
name
.
encode
()))
log
.
info
(
"Acknowledging id=%s stream=%s group_id=%s"
,
ack_id
,
self
.
stream
,
group_id
)
self
.
consumer
.
acknowledge
(
group_id
,
ack_id
,
stream
=
self
.
stream
)
except
Exception
as
e
:
log
.
warning
(
"Acknowledging of id=%s stream=%s group_id=%s fails. Reason=%s"
,
ack_id
,
self
.
stream
,
group_id
,
e
)
def
get_unacknowledged_messages
(
self
,
group_id
,
stream
):
try
:
msgs
=
self
.
consumer
.
get_unacknowledged_messages
(
group_id
,
stream
=
stream
)
return
msgs
except
asapo_consumer
.
AsapoConsumerError
as
err
:
raise
StreamError
(
"Failed to get unacknowledged messages"
)
from
err
def
get_last_acknowledged_message
(
self
,
group_id
):
try
:
return
self
.
consumer
.
get_last_acknowledged_message
(
group_id
,
stream
=
self
.
stream
)
except
Exception
as
err
:
raise
StreamError
(
"Failed to get unacknowledged messages"
)
from
err
# TODO: Ensure also that indices are consecutive or start at 0
@
Configurable
...
...
@@ -295,7 +319,7 @@ class SerialDatasetAsapoReceiver(SerialAsapoReceiver):
A wrapper for an ASAP::O consumer for dataset processing
This wrapper supports functionality of SerialAsapoReceiver but
expects to receive a dataset
expects to receive a dataset
"""
def
_get_next
(
self
,
meta_only
):
...
...
src/AsapoWorker/asapo_sender.py
View file @
cb06c034
...
...
@@ -12,7 +12,7 @@ log = logging.getLogger(__name__)
def
create_producer
(
source
,
source_type
,
beamtime
,
beamline
,
data_source
,
token
,
nthreads
=
1
,
timeout_producer
=
30000
):
timeout
=
timeout_producer
/
1000
timeout
=
timeout_producer
log
.
info
(
"Create new producer (source=%s, type=%s, beamtime=%s, beamline=%s, "
"data_source=%s, token=%s, nthreads=%i, timeout=%s)."
,
...
...
@@ -96,6 +96,8 @@ class AsapoSender:
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown"
,
type
=
float
,
default
=
3000
)
))
data_source
=
Config
(
"Name of output data_source"
,
type
=
str
,
default
=
""
)
stream
=
Config
(
"The name of the stream."
,
type
=
str
,
default
=
"default"
,
init
=
False
)
ingest_mode
=
Config
(
...
...
@@ -118,10 +120,16 @@ class AsapoSender:
"Length of queue of data waiting to be sent"
,
type
=
int
,
default
=
0
,
init
=
False
)
_lock
=
Config
(
"Used internally for concurrent access
to the n_queued attribute
"
,
"Used internally for concurrent access
from callbacks
"
,
factory
=
threading
.
Lock
,
init
=
False
)
_group_id
=
Config
(
"Generated group id for the ASAP::O consumer"
,
type
=
str
,
init
=
False
)
_ids_to_acknowledge
=
Config
(
"Dictionary mapping output ids to input ids that need to be "
"acknowledged when sending is successful"
,
factory
=
dict
,
init
=
False
)
_acknowledge_received
=
Config
(
"Callback to acknowledge id"
,
init
=
False
,
default
=
None
)
@
_group_id
.
default
def
_generate_group_id
(
self
):
...
...
@@ -145,12 +153,20 @@ class AsapoSender:
def
__attrs_post_init__
(
self
):
log
.
info
(
"Receiver created with ingest_mode=%s"
,
self
.
ingest_mode
)
def
send_data
(
self
,
data
,
metadata
):
def
send_data
(
self
,
data
,
metadata
,
acknowledge
=
None
,
ack_dependencies
=
None
):
log
.
info
(
"Sending data with id=%s name=%s"
,
metadata
[
"_id"
],
metadata
[
"name"
])
with
self
.
_lock
:
self
.
_n_queued
+=
1
if
ack_dependencies
is
None
:
ack_dependencies
=
set
([
metadata
[
"_id"
]])
if
acknowledge
is
None
:
acknowledge
=
[]
for
out_id
in
acknowledge
:
if
out_id
not
in
self
.
_ids_to_acknowledge
:
self
.
_ids_to_acknowledge
[
out_id
]
=
ack_dependencies
try
:
self
.
producer
.
send
(
metadata
[
'_id'
],
metadata
[
'name'
],
data
,
...
...
@@ -180,15 +196,30 @@ class AsapoSender:
def
_callback
(
self
,
header
,
err
):
header
=
{
key
:
val
for
key
,
val
in
header
.
items
()
if
key
!=
'data'
}
out_id
=
header
[
"id"
]
with
self
.
_lock
:
assert
self
.
_n_queued
>
0
self
.
_n_queued
-=
1
ids_to_acknowledge
=
self
.
get_ids_to_acknowledge
(
out_id
)
if
err
:
log
.
error
(
"Sending data failed for header=%s with error='%s'"
,
header
,
err
)
else
:
log
.
info
(
"Successfully sent data for header=%s"
,
header
)
if
self
.
_acknowledge_received
:
self
.
_acknowledge_received
(
self
.
data_source
,
ids_to_acknowledge
)
def
get_ids_to_acknowledge
(
self
,
out_id
):
id_to_acknowledge
=
[]
for
in_id
,
ack_ids
in
self
.
_ids_to_acknowledge
.
items
():
ack_ids
.
discard
(
out_id
)
if
not
ack_ids
:
id_to_acknowledge
.
append
(
in_id
)
for
in_id
in
id_to_acknowledge
:
del
self
.
_ids_to_acknowledge
[
in_id
]
return
id_to_acknowledge
def
wait
(
self
,
timeout
=
10
):
with
self
.
_lock
:
...
...
@@ -217,7 +248,7 @@ class AsapoSender:
log
.
info
(
"Successfully finished sending all queued data"
)
def
get_last
(
self
,
meta_only
=
True
):
log
.
info
(
"Requesting last record
"
)
log
.
info
(
"Requesting last record
for stream=%s"
,
self
.
stream
)
try
:
data
,
metadata
=
self
.
consumer
.
get_last
(
meta_only
=
meta_only
,
stream
=
self
.
stream
)
except
asapo_consumer
.
AsapoEndOfStreamError
:
...
...
src/AsapoWorker/worker.py
View file @
cb06c034
import
json
import
logging
from
AsapoWorker.data_handler
import
get_filename_parts
from
AsapoWorker.asapo_sender
import
AsapoSender
from
AsapoWorker.configurable
import
Configurable
,
Config
from
AsapoWorker.errors
import
ConfigurationError
log
=
logging
.
getLogger
(
__name__
)
def
to_set
(
a
):
if
a
is
None
or
isinstance
(
a
,
set
):
return
a
else
:
return
set
(
a
)
@
Configurable
class
Worker
:
...
...
@@ -13,6 +23,8 @@ class Worker:
meta_only
=
Config
(
"Get only metadata from receiver"
,
type
=
bool
,
default
=
False
,
init
=
False
)
_acknowledge
=
Config
(
"Callback to acknowledge id"
,
init
=
False
,
default
=
None
)
def
handle_error
(
self
):
pass
...
...
@@ -68,13 +80,17 @@ class Worker:
"""
pass
def
send
(
self
,
data
,
metadata
):
def
send
(
self
,
data
,
metadata
,
acknowledge
=
None
,
ack_dependencies
=
None
):
if
self
.
sender
:
self
.
sender
.
send_data
(
data
,
metadata
)
self
.
sender
.
send_data
(
data
,
metadata
,
acknowledge
=
acknowledge
,
ack_dependencies
=
to_set
(
ack_dependencies
))
else
:
raise
ConfigurationError
(
"Worker wants to send data, but no sender configured!"
)
def
send_acknowledgement
(
self
,
acknowledge
=
None
):
self
.
_acknowledge
(
self
.
sender
.
data_source
,
acknowledge
)
@
Configurable
(
kw_only
=
True
)
class
SimpleWorker
(
Worker
):
...
...
@@ -135,7 +151,7 @@ class SimpleWorker(Worker):
raise
ConfigurationError
(
"Worker wants to send data, but no sender configured!"
)
def
send
(
self
,
data
,
metadata
,
extra_meta
=
None
):
def
send
(
self
,
data
,
metadata
,
extra_meta
=
None
,
acknowledge
=
None
):
"""Send the data to the configured output stream
The metadata and extra_meta arguments are passed to
...
...
@@ -150,10 +166,11 @@ class SimpleWorker(Worker):
extra_meta: dict (optional)
If given, the 'meta' entry of the output metadata is updated with
the content of this dict
acknowledge: list if receiver message ids to be acknowledged
"""
new_metadata
=
self
.
get_output_metadata
(
metadata
,
extra_meta
)
super
().
send
(
data
,
new_metadata
)
super
().
send
(
data
,
new_metadata
,
acknowledge
=
acknowledge
)
@
Configurable
(
kw_only
=
True
)
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment