Commit 674507fc authored by Tim Schoof's avatar Tim Schoof
Browse files

Add SimpleWorker class

parent bc74cd9f
import json
from AsapoWorker.data_handler import get_filename_parts
from AsapoWorker.asapo_sender import AsapoSender
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import ConfigurationError
......@@ -32,6 +34,75 @@ class Worker:
"Worker wants to send data, but no sender configured!")
@Configurable(kw_only=True)
class SimpleWorker(Worker):
"""Worker class for simple, independent processing of single images"""
output_name_format = Config(
"Format for deriving output name from input name",
type=str, default="{basename}_processed-{index}")
def get_output_name(self, metadata):
"Get the output name from output_name_format and the input metadata"
base, index, ext = get_filename_parts(metadata)
return self.output_name_format.format(
basename=base, index=index)
def get_output_type(self):
"Return the type of the output data. Used as the file extension"
raise NotImplementedError("Output type not known")
def get_output_metadata(self, metadata, extra_meta=None):
"""Get output metadata from input metadata
The new metadata is derived from the given metadata of the input data.
The record's user metadata is updated with extra_meta, if given.
Parameters
----------
metadata: dict
Metadata of the input. The output metadata is derived from this
extra_meta: dict (optional)
If given, the 'meta' entry of the output metadata is updated with
the content of this dict
"""
input_names = [metadata["name"]]
input_ids = [metadata["_id"]]
acknowledged_ids = input_ids
new_meta = dict(
input_names=input_names,
input_ids=input_ids,
acknowledged_ids=acknowledged_ids)
if extra_meta:
new_meta.update(extra_meta)
new_meta_json = json.dumps(new_meta)
new_metadata = dict(
_id=metadata["_id"],
name=self.get_output_name(metadata) + "." + self.get_output_type(),
meta=new_meta_json)
return new_metadata
def send(self, data, metadata, extra_meta=None):
"""Send the data to the configured output stream
The metadata and extra_meta arguments are passed to
get_output_metadata. Its return value is sent as the output metadata.
Parameters
----------
data: bytes
The data to send
metadata: dict
Metadata of the input. The output metadata is derived from this
extra_meta: dict (optional)
If given, the 'meta' entry of the output metadata is updated with
the content of this dict
"""
new_metadata = self.get_output_metadata(metadata, extra_meta)
super().send(data, new_metadata)
@Configurable(kw_only=True)
class SerialWorker(Worker):
start_id = Config(
......
import json
from AsapoWorker.worker import SimpleWorker
class DummySimpleWorker(SimpleWorker):
def get_output_type(self):
return "h5"
def test_get_output_metadata():
simple_worker = DummySimpleWorker(
output_name_format="{basename}_dummy-{index}")
metadata = dict(
_id=5,
name="foo-4.numpy")
extra_meta = dict(extra_entry=42)
output_metadata = simple_worker.get_output_metadata(
metadata, extra_meta=extra_meta)
output_metadata["meta"] = json.loads(output_metadata["meta"])
expected_output_metadata = dict(
_id=5,
name="foo_dummy-4.h5",
meta=dict(
input_names=["foo-4.numpy"],
input_ids=[5],
acknowledged_ids=[5],
extra_entry=42))
assert output_metadata == expected_output_metadata
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment