Commit a286a183 authored by Jerome Carnis's avatar Jerome Carnis 🦊
Browse files

Move Eiger to a different module, add the EigerScanner and EigerSwTrigger classes

parent c20d2b1e
Pipeline #28049 failed with stages
in 5 minutes and 9 seconds
......@@ -4,7 +4,7 @@ from collections import deque
from copy import copy
from math import inf
from threading import Thread
from typing import Any, Callable, Tuple, List, Optional, Union, TYPE_CHECKING
from typing import Any, Callable, Dict, Tuple, List, Optional, Union, TYPE_CHECKING
import numpy as np
import oyaml as yaml
......@@ -383,17 +383,21 @@ class Device(Subject, YamlSerializable):
kamzik3.session.publisher.push_message(header, attribute_path)
def share_group(
self, source_device, source_group, target_group, attribute_name_mask=None
self,
source_device,
source_group: Optional[str],
target_group: str,
attribute_name_mask: Optional[Dict] = None,
):
"""
Share all attributes in group.
Filter by attribute_name_mask from source_device.
:param Device source_device: Source Device object
:param str source_group: Name of the group to share
:param source_device: Source Device object
:param source_group: Name of the group to share
:param str target_group: Target group on this Device
:param dict attribute_name_mask: Use this, if You need to rename Attribute
:param attribute_name_mask: Use this, if You need to rename Attribute
"""
if attribute_name_mask is None:
attribute_name_mask = {}
......
import datetime
import os
import time
from time import sleep
from threading import Thread
from typing import Any, Dict, Optional
import numpy as np
import kamzik3
from kamzik3 import DeviceError, units
from kamzik3.constants import (
ATTR_FRAME_COUNT,
ATTR_MACRO_PREFIX,
ATTR_SCAN_COUNT,
ATTR_STATUS,
ATTR_TRIGGERS_GENERATED,
ATTR_TRIGGER_PULSE_WIDTH,
ATTR_TRIGGERS_SP,
STATUS_BUSY,
STATUS_CONFIGURED,
STATUS_IDLE,
VALUE,
)
from kamzik3.devices.device import Device
from kamzik3.devices.deviceTango import DeviceTango
from kamzik3.devices.general.deviceScanner import DeviceScanner
from kamzik3.macro.scan import Scan
from kamzik3.snippets.snippetsDecorators import expose_method
class DeviceTangoEigerDetector(DeviceTango):
"""
Implementation of the Eiger detector.
:param path: path of the Tango server, e.g.
tango://haspp11oh:10000/p11/simplon_detector/eh.01
:param file_writer: instance of a Simplon FileWriter Tango device
:param eiger_stream: instance of a Simplon Stream Tango device
:param device_id: str, the name to give to this device
:param config: a dictionary of configuration parameters
"""
def __init__(
self, path, file_writer=None, eiger_stream=None, device_id=None, config=None
):
self.file_writer = file_writer
self.eiger_stream = eiger_stream
DeviceTango.__init__(self, path, device_id, config)
def handle_configuration(self):
"""Configure the Eiger device."""
try:
super().handle_configuration()
self.check_interface()
if self.file_writer is not None:
self.share_group(self.file_writer, None, "FileWriter")
if self.eiger_stream is not None:
self.share_group(self.eiger_stream, None, "EigerStream")
except self.tango_exceptions:
# Exception was handled on DeviceTango level
pass
def check_interface(self):
"""
Check the configuration of the Eiger interface.
Either the FileWriter or the Streaming interface has to be enabled (not both).
Mode 0=enabled, 1=Disabled
"""
if (
self.file_writer is not None
and self.file_writer.get_value("Mode") == "0"
and self.eiger_stream is not None
and self.eiger_stream.get_value("Mode") == "0"
):
raise DeviceError(
"Enable either the File Writer or the Streaming Interface, not both"
)
if self.file_writer is None and self.eiger_stream is None:
raise DeviceError("No interface provided for the Eiger!")
def _init_attributes(self):
DeviceTango._init_attributes(self)
self.create_attribute("shotDir")
self.create_attribute("scanDir")
self.create_attribute("scanFilenamePattern")
self.create_attribute(
ATTR_FRAME_COUNT,
default_value=0,
readonly=True,
description="Internal frame counter",
default_type=np.uint64,
min_value=0,
)
def _config_interface(self, saving_directory, prefix, nb_frames=1):
"""
Configure the Eiger interface.
:param saving_directory: str, the name of the saving directory
:param prefix: str, prefix to add to the file names (filewriter) or folder name
for the dataset (stream subsystem)
:param nb_frames: int, number of frames to be saved per .h5 file for the
Filewiter
"""
now = datetime.datetime.now()
if self.file_writer is not None:
path = os.path.join(
self.get_value(saving_directory),
f"{prefix}_{self.device_id}_{now.strftime('%Y-%m-%d_%H:%M:%S')}",
)
self.set_attribute(["FileWriter", "Mode", VALUE], 0)
self.set_attribute(["FileWriter", "NamePattern", VALUE], path)
self.set_attribute(["FileWriter", "NimagesPerFile", VALUE], nb_frames)
else: # use the Stream subsystem
path = '{"series_name": "' + prefix + '"}'
self.set_attribute(["EigerStream", "Mode", VALUE], 0)
self.set_attribute(["EigerStream", "HeaderAppendix", VALUE], path)
self.set_attribute(["EigerStream", "ImageAppendix", VALUE], path)
@expose_method({"Name": "FilePrefix", "Exposure": "FrameTime", "Frames": "Nimages"})
def acquire_frame(self, Name, exposure, frames):
frames = int(self.to_device_unit("Nimages", frames).m)
self._config_interface(
saving_directory="shotDir", prefix=Name, nb_frames=frames
)
exposure = float(self.to_device_unit("FrameTime", exposure).m)
# According to Jan 10ms is lowest frame time
frame_time = 10e-3 if exposure < 10e-3 else exposure
self.set_attribute(["TriggerMode", VALUE], 3)
self.set_attribute(["FrameTime", VALUE], frame_time)
self.set_attribute(["CountTime", VALUE], exposure)
self.set_attribute(["Ntrigger", VALUE], 1)
self.set_attribute(["Nimages", VALUE], frames)
self.Arm()
self.Trigger()
self.set_value(ATTR_FRAME_COUNT, self.get_value(ATTR_FRAME_COUNT) + frames)
@expose_method({"Exposure": "FrameTime"})
def live_view(self, exposure):
"""
Live view of the Eiger detector.
This is useful when aligning the optics or the sample. Images will not be saved.
:param exposure: desired exposure time per frame
"""
self.logger.info("Live view has started.")
self.set_attribute(["TriggerMode", VALUE], 3)
if self.file_writer is not None:
self.logger.info("FileWriter disabled")
self.set_attribute(["FileWriter", "Mode", VALUE], 1)
self.set_attribute(["FileWriter", "NimagesPerFile", VALUE], 1000000)
if self.eiger_stream is not None:
self.logger.info("EigerStream disabled")
self.set_attribute(["EigerStream", "Mode", VALUE], 1)
exposure = float(self.to_device_unit("FrameTime", exposure).m)
# According to Jan Meyer 10ms is the lowest frame time
frame_time = 10e-3 if exposure < 10e-3 else exposure
self.set_attribute(["FrameTime", VALUE], frame_time)
self.set_attribute(["CountTime", VALUE], exposure)
self.set_attribute(["Ntrigger", VALUE], 1)
self.set_attribute(["Nimages", VALUE], 1000000)
self.Arm()
self.Trigger()
self.set_value(ATTR_FRAME_COUNT, self.get_value(ATTR_FRAME_COUNT) + 1000000)
@expose_method()
def stop(self):
if self.get_value(ATTR_STATUS) == STATUS_BUSY:
self.logger.info("Acquisition aborted")
self.Abort()
self.Disarm()
class EigerScanner(DeviceScanner):
def __init__(
self,
detector: DeviceTangoEigerDetector,
device_id: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
):
self.detector = detector
self.scanned_devices = []
DeviceScanner.__init__(self, device_id, config)
self.connect()
def _recount_steps(self, parent_macro, to_link=None):
total_frames = 1
for link in parent_macro.chain:
if link == to_link:
break
if isinstance(link, Scan) and total_frames > 0:
repeat_step = link.step_attributes.get("repeat_count", 0) + 1
repeat_scan = link.repeat_count + 1
total_frames *= (link.steps_count + 1) * repeat_step * repeat_scan
else:
total_frames *= link.get_total_points_count()
return total_frames
@expose_method()
def get_scanner_attributes(self):
return []
def on_macro_done(self):
self.detector.stop()
@expose_method()
def get_scanner_macro(self, scanner_input, scanner_attributes, parent_macro=None):
macro_server = kamzik3.session.get_device("MacroServer")
current_scan_count = macro_server.get_macro_count()
if current_scan_count != self.last_scan_count:
self._init_new_scan(scanner_input, scanner_attributes, parent_macro)
self.last_scan_count = current_scan_count
scanner_input.on_macro_done = self.on_macro_done
return scanner_input
def _init_new_scan(self, scanner_input, scanner_attributes, parent_macro):
self.logger.info(
"Initiating new scan number {}".format(
kamzik3.session.get_value(ATTR_SCAN_COUNT)
)
)
macro_server = kamzik3.session.get_device("MacroServer")
scan_prefix = macro_server.get_value(ATTR_MACRO_PREFIX)
scan_count = macro_server.get_macro_count()
scan_dir_name = "{}_{}".format(scan_prefix, scan_count - 1)
params = scanner_input.method_parameters
exposure_time = float(units.Quantity(params["Exposure"]).to("s").m)
frame_time = 10e-3 if exposure_time < 10e-3 else exposure_time
self.detector.set_attribute(["FrameTime", VALUE], frame_time)
self.detector.set_attribute(["CountTime", VALUE], exposure_time)
frames_count = int(float(params["Frames"]))
total_frames = self._recount_steps(parent_macro, scanner_input) * frames_count
self.detector.set_value(ATTR_FRAME_COUNT, 0)
self.detector.set_attribute(["TriggerMode", VALUE], 3)
self.detector.set_attribute(["Nimages", VALUE], frames_count)
self.detector.set_attribute(["Ntrigger", VALUE], total_frames)
path = os.path.join(
self.detector.get_value("scanDir"), scan_dir_name, f"{scan_dir_name}"
)
self.detector.set_attribute(["FileWriter", "Mode", VALUE], 0)
self.detector.set_attribute(["FileWriter", "NamePattern", VALUE], path)
self.detector.set_attribute(["FileWriter", "NimagesPerFile", VALUE], 1000)
self.detector.Arm()
class EigerTrigger(Device):
def __init__(self, detector, device_id=None, config=None):
self.detector = detector
Device.__init__(self, device_id, config)
self.connect()
def handle_configuration(self):
start_at = time.time()
self._config_attributes()
self.set_status(STATUS_CONFIGURED)
self.logger.info(
"Device configuration took {} sec.".format(time.time() - start_at)
)
def _init_attributes(self):
Device._init_attributes(self)
self.create_attribute(
ATTR_TRIGGERS_SP,
default_value=0,
description="Number of triggers to generate",
default_type=np.uint64,
min_value=0,
)
self.create_attribute(
ATTR_TRIGGERS_GENERATED,
default_value=0,
readonly=True,
description="Number of generated triggers",
default_type=np.uint64,
min_value=0,
)
self.create_attribute(
ATTR_TRIGGER_PULSE_WIDTH,
default_value=0,
description="Width of the trigger pulse",
default_type=np.float64,
min_value=0,
decimals=3,
unit="sec",
)
@expose_method({"Exposure": ATTR_TRIGGER_PULSE_WIDTH, "Frames": ATTR_TRIGGERS_SP})
def acquire_frame(self, exposure, frames):
frames = self.to_device_unit(ATTR_TRIGGERS_SP, frames).m
exposure = self.to_device_unit(ATTR_TRIGGER_PULSE_WIDTH, exposure).m
self.set_value(ATTR_TRIGGERS_SP, frames)
self.set_value(ATTR_TRIGGER_PULSE_WIDTH, exposure)
self.set_value(ATTR_TRIGGERS_GENERATED, 0)
Thread(target=self._acquisition_thread).start()
def _acquisition_thread(self):
raise NotImplementedError
@expose_method()
def stop(self):
self.set_value(ATTR_STATUS, STATUS_IDLE)
self.detector.stop()
class EigerSwTrigger(EigerTrigger):
def _acquisition_thread(self):
self.set_status(STATUS_BUSY)
while (
self.get_value(ATTR_TRIGGERS_GENERATED) != self.get_value(ATTR_TRIGGERS_SP)
and self.get_value(ATTR_STATUS) == STATUS_BUSY
):
self.detector.Trigger()
sleep(self.get_value(ATTR_TRIGGER_PULSE_WIDTH))
self.set_value(
ATTR_TRIGGERS_GENERATED, self.get_value(ATTR_TRIGGERS_GENERATED) + 1
)
self.set_status(STATUS_IDLE)
......@@ -371,153 +371,3 @@ class DeviceTangoLambdaDetector(DeviceTango):
@expose_method()
def stop(self):
self.StopAcq()
class DeviceTangoEigerDetector(DeviceTango):
"""
Implementation of the Eiger detector.
:param path: path of the Tango server, e.g.
tango://haspp11oh:10000/p11/simplon_detector/eh.01
:param file_writer: instance of a Simplon FileWriter Tango device
:param eiger_stream: instance of a Simplon Stream Tango device
:param device_id: str, the name to give to this device
:param config: a dictionary of configuration parameters
"""
def __init__(
self, path, file_writer=None, eiger_stream=None, device_id=None, config=None
):
self.file_writer = file_writer
self.eiger_stream = eiger_stream
DeviceTango.__init__(self, path, device_id, config)
def handle_configuration(self):
"""Configure the Eiger device."""
try:
super().handle_configuration()
self.check_interface()
if self.file_writer is not None:
self.share_group(self.file_writer, None, "FileWriter")
if self.eiger_stream is not None:
self.share_group(self.eiger_stream, None, "EigerStream")
except self.tango_exceptions:
# Exception was handled on DeviceTango level
pass
def check_interface(self):
"""
Check the configuration of the Eiger interface.
Either the FileWriter or the Streaming interface has to be enabled (not both).
Mode 0=enabled, 1=Disabled
"""
if (
self.file_writer is not None
and self.file_writer.get_value("Mode") == "0"
and self.eiger_stream is not None
and self.eiger_stream.get_value("Mode") == "0"
):
raise DeviceError(
"Enable either the File Writer or the Streaming Interface, not both"
)
if self.file_writer is None and self.eiger_stream is None:
raise DeviceError("No interface provided for the Eiger!")
def _init_attributes(self):
DeviceTango._init_attributes(self)
self.create_attribute("shotDir")
self.create_attribute("scanDir")
self.create_attribute("scanFilenamePattern")
self.create_attribute(
ATTR_FRAME_COUNT,
default_value=0,
readonly=True,
description="Internal frame counter",
default_type=np.uint64,
min_value=0,
)
def _config_interface(self, saving_directory, prefix, nb_frames=1):
"""
Configure the Eiger interface.
:param saving_directory: str, the name of the saving directory
:param prefix: str, prefix to add to the file names (filewriter) or folder name
for the dataset (stream subsystem)
:param nb_frames: int, number of frames to be saved per .h5 file for the
Filewiter
"""
now = datetime.datetime.now()
if self.file_writer is not None:
path = os.path.join(
self.get_value(saving_directory),
f"{prefix}_{self.device_id}_{now.strftime('%Y-%m-%d_%H:%M:%S')}",
)
self.set_attribute(["FileWriter", "Mode", VALUE], 0)
self.set_attribute(["FileWriter", "NamePattern", VALUE], path)
self.set_attribute(["FileWriter", "NimagesPerFile", VALUE], nb_frames)
else: # use the Stream subsystem
path = '{"series_name": "' + prefix + '"}'
self.set_attribute(["EigerStream", "Mode", VALUE], 0)
self.set_attribute(["EigerStream", "HeaderAppendix", VALUE], path)
self.set_attribute(["EigerStream", "ImageAppendix", VALUE], path)
@expose_method({"Name": "FilePrefix", "Exposure": "FrameTime", "Frames": "Nimages"})
def acquire_frame(self, Name, Exposure, Frames):
Frames = int(device_units(self, "Nimages", Frames).m)
self._config_interface(
saving_directory="shotDir", prefix=Name, nb_frames=Frames
)
Exposure = float(device_units(self, "FrameTime", Exposure).m)
# According to Jan 10ms is lowest frame time
frame_time = 10e-3 if Exposure < 10e-3 else Exposure
self.set_attribute(["TriggerMode", VALUE], 3)
self.set_attribute(["FrameTime", VALUE], frame_time)
self.set_attribute(["CountTime", VALUE], Exposure)
self.set_attribute(["Ntrigger", VALUE], 1)
self.set_attribute(["Nimages", VALUE], Frames)
self.Arm()
self.Trigger()
self.set_value(ATTR_FRAME_COUNT, self.get_value(ATTR_FRAME_COUNT) + Frames)
@expose_method({"Exposure": "FrameTime"})
def live_view(self, Exposure):
"""
Live view of the Eiger detector.
This is useful when aligning the optics or the sample. Images will not be saved.
:param Exposure: desired exposure time per frame
"""
self.logger.info("Live view has started.")
self.set_attribute(["TriggerMode", VALUE], 3)
if self.file_writer is not None:
self.logger.info("FileWriter disabled")
self.set_attribute(["FileWriter", "Mode", VALUE], 1)
self.set_attribute(["FileWriter", "NimagesPerFile", VALUE], 1000000)
if self.eiger_stream is not None:
self.logger.info("EigerStream disabled")
self.set_attribute(["EigerStream", "Mode", VALUE], 1)
Exposure = float(device_units(self, "FrameTime", Exposure).m)
# According to Jan 10ms is lowest frame time
frame_time = 10e-3 if Exposure < 10e-3 else Exposure
self.set_attribute(["FrameTime", VALUE], frame_time)
self.set_attribute(["CountTime", VALUE], Exposure)
self.set_attribute(["Ntrigger", VALUE], 1)
self.set_attribute(["Nimages", VALUE], 1000000)
self.Arm()
self.Trigger()
self.set_value(ATTR_FRAME_COUNT, self.get_value(ATTR_FRAME_COUNT) + 1000000)
@expose_method()
def stop(self):
if self.get_value(ATTR_STATUS) == STATUS_BUSY:
self.logger.info("Acquisition aborted")
self.Abort()
self.Disarm()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment