Commit c55866d3 authored by Jerome Carnis's avatar Jerome Carnis 🦊
Browse files

deviceEigerP11.py: renamed from deviceEiger.py, add docstrings

parent a286a183
Pipeline #28052 failed with stages
in 5 minutes and 56 seconds
......@@ -31,7 +31,10 @@ from kamzik3.snippets.snippetsDecorators import expose_method
class DeviceTangoEigerDetector(DeviceTango):
"""
Implementation of the Eiger detector.
Implementation of the Eiger detector at P11.
The method acquire_frame of this device can be used to take image(s) without scan.
For a scan, use the device EigerSwTrigger instead.
:param path: path of the Tango server, e.g.
tango://haspp11oh:10000/p11/simplon_detector/eh.01
......@@ -179,6 +182,92 @@ class DeviceTangoEigerDetector(DeviceTango):
self.Disarm()
class EigerTrigger(Device):
"""
Base class for triggering the Eiger detector.
The method _acquisition_thread needs to be overriden depending on which trigger
is used.
"""
def __init__(self, detector, device_id=None, config=None):
self.detector = detector
Device.__init__(self, device_id, config)
self.connect()
def handle_configuration(self):
start_at = time.time()
self._config_attributes()
self.set_status(STATUS_CONFIGURED)
self.logger.info(
"Device configuration took {} sec.".format(time.time() - start_at)
)
def _init_attributes(self):
Device._init_attributes(self)
self.create_attribute(
ATTR_TRIGGERS_SP,
default_value=0,
description="Number of triggers to generate",
default_type=np.uint64,
min_value=0,
)
self.create_attribute(
ATTR_TRIGGERS_GENERATED,
default_value=0,
readonly=True,
description="Number of generated triggers",
default_type=np.uint64,
min_value=0,
)
self.create_attribute(
ATTR_TRIGGER_PULSE_WIDTH,
default_value=0,
description="Width of the trigger pulse",
default_type=np.float64,
min_value=0,
decimals=3,
unit="sec",
)
@expose_method({"Exposure": ATTR_TRIGGER_PULSE_WIDTH, "Frames": ATTR_TRIGGERS_SP})
def acquire_frame(self, exposure, frames):
frames = self.to_device_unit(ATTR_TRIGGERS_SP, frames).m
exposure = self.to_device_unit(ATTR_TRIGGER_PULSE_WIDTH, exposure).m
self.set_value(ATTR_TRIGGERS_SP, frames)
self.set_value(ATTR_TRIGGER_PULSE_WIDTH, exposure)
self.set_value(ATTR_TRIGGERS_GENERATED, 0)
Thread(target=self._acquisition_thread).start()
def _acquisition_thread(self):
raise NotImplementedError
@expose_method()
def stop(self):
self.set_value(ATTR_STATUS, STATUS_IDLE)
self.detector.stop()
class EigerSwTrigger(EigerTrigger):
"""
Use this class as the Device in the macro templates if you want to do a scan, with
the EigerScanner as the Scanner.
"""
def _acquisition_thread(self):
self.set_status(STATUS_BUSY)
while (
self.get_value(ATTR_TRIGGERS_GENERATED) != self.get_value(ATTR_TRIGGERS_SP)
and self.get_value(ATTR_STATUS) == STATUS_BUSY
):
self.detector.Trigger()
sleep(self.get_value(ATTR_TRIGGER_PULSE_WIDTH))
self.set_value(
ATTR_TRIGGERS_GENERATED, self.get_value(ATTR_TRIGGERS_GENERATED) + 1
)
self.set_status(STATUS_IDLE)
class EigerScanner(DeviceScanner):
def __init__(
self,
......@@ -254,77 +343,3 @@ class EigerScanner(DeviceScanner):
self.detector.set_attribute(["FileWriter", "NimagesPerFile", VALUE], 1000)
self.detector.Arm()
class EigerTrigger(Device):
def __init__(self, detector, device_id=None, config=None):
self.detector = detector
Device.__init__(self, device_id, config)
self.connect()
def handle_configuration(self):
start_at = time.time()
self._config_attributes()
self.set_status(STATUS_CONFIGURED)
self.logger.info(
"Device configuration took {} sec.".format(time.time() - start_at)
)
def _init_attributes(self):
Device._init_attributes(self)
self.create_attribute(
ATTR_TRIGGERS_SP,
default_value=0,
description="Number of triggers to generate",
default_type=np.uint64,
min_value=0,
)
self.create_attribute(
ATTR_TRIGGERS_GENERATED,
default_value=0,
readonly=True,
description="Number of generated triggers",
default_type=np.uint64,
min_value=0,
)
self.create_attribute(
ATTR_TRIGGER_PULSE_WIDTH,
default_value=0,
description="Width of the trigger pulse",
default_type=np.float64,
min_value=0,
decimals=3,
unit="sec",
)
@expose_method({"Exposure": ATTR_TRIGGER_PULSE_WIDTH, "Frames": ATTR_TRIGGERS_SP})
def acquire_frame(self, exposure, frames):
frames = self.to_device_unit(ATTR_TRIGGERS_SP, frames).m
exposure = self.to_device_unit(ATTR_TRIGGER_PULSE_WIDTH, exposure).m
self.set_value(ATTR_TRIGGERS_SP, frames)
self.set_value(ATTR_TRIGGER_PULSE_WIDTH, exposure)
self.set_value(ATTR_TRIGGERS_GENERATED, 0)
Thread(target=self._acquisition_thread).start()
def _acquisition_thread(self):
raise NotImplementedError
@expose_method()
def stop(self):
self.set_value(ATTR_STATUS, STATUS_IDLE)
self.detector.stop()
class EigerSwTrigger(EigerTrigger):
def _acquisition_thread(self):
self.set_status(STATUS_BUSY)
while (
self.get_value(ATTR_TRIGGERS_GENERATED) != self.get_value(ATTR_TRIGGERS_SP)
and self.get_value(ATTR_STATUS) == STATUS_BUSY
):
self.detector.Trigger()
sleep(self.get_value(ATTR_TRIGGER_PULSE_WIDTH))
self.set_value(
ATTR_TRIGGERS_GENERATED, self.get_value(ATTR_TRIGGERS_GENERATED) + 1
)
self.set_status(STATUS_IDLE)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment