Commit d158b8df authored by Jerome Carnis's avatar Jerome Carnis 🦊
Browse files

deviceEigerP11.py: use an enum for the trigger mode

parent 72065d9f
Pipeline #28408 failed with stages
in 4 minutes and 59 seconds
import datetime
from enum import IntEnum
import os
import time
from time import sleep
from threading import Thread
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
import numpy as np
......@@ -28,10 +29,14 @@ from kamzik3.devices.general.deviceScanner import DeviceScanner
from kamzik3.macro.scan import Scan
from kamzik3.snippets.snippetsDecorators import expose_method
"""
TriggerMode 0, 1, 2, 3 correspond to "EXTE", "EXTS", "INTE", "INTS" in Eiger
documentation.
"""
class TriggerMode(IntEnum):
"""The trigger modes can be found in Eiger documentation."""
EXTE = 0
EXTS = 1
INTE = 2
INTS = 3
class DeviceTangoEigerDetector(DeviceTango):
......@@ -51,12 +56,12 @@ class DeviceTangoEigerDetector(DeviceTango):
def __init__(
self, path, file_writer=None, eiger_stream=None, device_id=None, config=None
):
) -> None:
self.file_writer = file_writer
self.eiger_stream = eiger_stream
DeviceTango.__init__(self, path, device_id, config)
def handle_configuration(self):
def handle_configuration(self) -> None:
"""Configure the Eiger device."""
try:
super().handle_configuration()
......@@ -69,7 +74,7 @@ class DeviceTangoEigerDetector(DeviceTango):
# Exception was handled on DeviceTango level
pass
def check_interface(self):
def check_interface(self) -> None:
"""
Check the configuration of the Eiger interface.
......@@ -90,7 +95,7 @@ class DeviceTangoEigerDetector(DeviceTango):
if self.file_writer is None and self.eiger_stream is None:
raise DeviceError("No interface provided for the Eiger!")
def _init_attributes(self):
def _init_attributes(self) -> None:
DeviceTango._init_attributes(self)
self.create_attribute("shotDir")
self.create_attribute("scanDir")
......@@ -104,7 +109,7 @@ class DeviceTangoEigerDetector(DeviceTango):
min_value=0,
)
def _config_interface(self, saving_directory, prefix, nb_frames=1):
def _config_interface(self, saving_directory, prefix, nb_frames=1) -> None:
"""
Configure the Eiger interface.
......@@ -130,7 +135,7 @@ class DeviceTangoEigerDetector(DeviceTango):
self.set_attribute(["EigerStream", "ImageAppendix", VALUE], path)
@expose_method({"Name": "FilePrefix", "Exposure": "FrameTime", "Frames": "Nimages"})
def acquire_frame(self, Name, Exposure, Frames):
def acquire_frame(self, Name, Exposure, Frames) -> None:
nb_frames = int(self.to_device_unit("Nimages", Frames).m)
self._config_interface(
saving_directory="shotDir", prefix=Name, nb_frames=nb_frames
......@@ -140,7 +145,7 @@ class DeviceTangoEigerDetector(DeviceTango):
# According to Jan 10ms is lowest frame time
frame_time = 10e-3 if exposure < 10e-3 else exposure
self.set_attribute(["TriggerMode", VALUE], 3)
self.set_attribute(["TriggerMode", VALUE], TriggerMode.INTS.value)
self.set_attribute(["FrameTime", VALUE], frame_time)
self.set_attribute(["CountTime", VALUE], exposure)
self.set_attribute(["Ntrigger", VALUE], 1)
......@@ -150,7 +155,7 @@ class DeviceTangoEigerDetector(DeviceTango):
self.set_value(ATTR_FRAME_COUNT, self.get_value(ATTR_FRAME_COUNT) + nb_frames)
@expose_method({"Exposure": "FrameTime"})
def live_view(self, Exposure):
def live_view(self, Exposure) -> None:
"""
Live view of the Eiger detector.
......@@ -160,7 +165,7 @@ class DeviceTangoEigerDetector(DeviceTango):
"""
frames_per_file = 1000000
self.logger.info("Live view has started.")
self.set_attribute(["TriggerMode", VALUE], 3)
self.set_attribute(["TriggerMode", VALUE], TriggerMode.INTS.value)
if self.file_writer is not None:
self.logger.info("FileWriter disabled")
self.set_attribute(["FileWriter", "Mode", VALUE], 1)
......@@ -183,7 +188,7 @@ class DeviceTangoEigerDetector(DeviceTango):
)
@expose_method()
def stop(self):
def stop(self) -> None:
"""
Stop the acquisition.
......@@ -204,12 +209,12 @@ class EigerTrigger(Device):
is used.
"""
def __init__(self, detector, device_id=None, config=None):
def __init__(self, detector, device_id=None, config=None) -> None:
self.detector = detector
Device.__init__(self, device_id, config)
self.connect()
def handle_configuration(self):
def handle_configuration(self) -> None:
start_at = time.time()
self._config_attributes()
self.set_status(STATUS_CONFIGURED)
......@@ -217,7 +222,7 @@ class EigerTrigger(Device):
"Device configuration took {} sec.".format(time.time() - start_at)
)
def _init_attributes(self):
def _init_attributes(self) -> None:
Device._init_attributes(self)
self.create_attribute(
ATTR_TRIGGERS_SP,
......@@ -245,7 +250,7 @@ class EigerTrigger(Device):
)
@expose_method({"Exposure": ATTR_TRIGGER_PULSE_WIDTH, "Frames": ATTR_TRIGGERS_SP})
def acquire_frame(self, Exposure, Frames):
def acquire_frame(self, Exposure, Frames) -> None:
nb_frames = self.to_device_unit(ATTR_TRIGGERS_SP, Frames).m
exposure_time = self.to_device_unit(ATTR_TRIGGER_PULSE_WIDTH, Exposure).m
self.set_value(ATTR_TRIGGERS_SP, nb_frames)
......@@ -253,11 +258,11 @@ class EigerTrigger(Device):
self.set_value(ATTR_TRIGGERS_GENERATED, 0)
Thread(target=self._acquisition_thread).start()
def _acquisition_thread(self):
def _acquisition_thread(self) -> None:
raise NotImplementedError
@expose_method()
def stop(self):
def stop(self) -> None:
self.set_value(ATTR_STATUS, STATUS_IDLE)
self.detector.stop()
......@@ -268,7 +273,7 @@ class EigerSwTrigger(EigerTrigger):
the EigerScanner as the Scanner.
"""
def _acquisition_thread(self):
def _acquisition_thread(self) -> None:
self.set_status(STATUS_BUSY)
while (
self.get_value(ATTR_TRIGGERS_GENERATED) != self.get_value(ATTR_TRIGGERS_SP)
......@@ -288,12 +293,12 @@ class EigerScanner(DeviceScanner):
detector: DeviceTangoEigerDetector,
device_id: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
):
) -> None:
self.detector = detector
DeviceScanner.__init__(self, device_id, config)
self.connect()
def _recount_steps(self, parent_macro, to_link=None):
def _recount_steps(self, parent_macro, to_link=None) -> int:
total_frames = 1
for link in parent_macro.chain:
if link == to_link:
......@@ -307,10 +312,10 @@ class EigerScanner(DeviceScanner):
return total_frames
@expose_method()
def get_scanner_attributes(self):
def get_scanner_attributes(self) -> List[str]:
return []
def on_macro_done(self):
def on_macro_done(self) -> None:
self.detector.stop()
@expose_method()
......@@ -323,7 +328,7 @@ class EigerScanner(DeviceScanner):
scanner_input.on_macro_done = self.on_macro_done
return scanner_input
def _init_new_scan(self, scanner_input, scanner_attributes, parent_macro):
def _init_new_scan(self, scanner_input, scanner_attributes, parent_macro) -> None:
self.logger.info(
"Initiating new scan number {}".format(
kamzik3.session.get_value(ATTR_SCAN_COUNT)
......@@ -344,7 +349,7 @@ class EigerScanner(DeviceScanner):
total_frames = self._recount_steps(parent_macro, scanner_input) * frames_count
self.detector.set_value(ATTR_FRAME_COUNT, 0)
self.detector.set_attribute(["TriggerMode", VALUE], 3)
self.detector.set_attribute(["TriggerMode", VALUE], TriggerMode.INTS.value)
self.detector.set_attribute(["Nimages", VALUE], frames_count)
self.detector.set_attribute(["Ntrigger", VALUE], total_frames)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment