-
Martin Killenberg authoredMartin Killenberg authored
almemo2490.py 18.57 KiB
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 2 11:08:12 2023
@author: michael pawelzik
Some parts have been copied from uros mavric and then they have been modified for compatibility.
#### SYNTAX FOR CALLING CLASS OBJECTS ########################################
Look into module play_with_almemo2490.py
##############################################################################
"""
# import pythod modules
import time
from telnetlib import Telnet
import re
from datetime import datetime
import pandas as pd
import numpy as np
import traceback
from functools import wraps
# python class for error handling of almemo 2490
class err_hdl(Exception):
# function for retry decorate object in case of exception
def do_retry(ExceptionToCheck = Exception, tries=4, delay=3, backoff=2, \
default_response = None):
"""Retry calling the decorated function using an backoff and defaults for return
original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
several lines of code have been modified for functionality
:param ExceptionToCheck: the exception to check. may be a tuple of
exceptions to check
:type ExceptionToCheck: Exception or tuple
:param tries: number of times to try (not retry) before giving up
:type tries: int
:param delay: initial delay between retries in seconds
:type delay: int
:param backoff: backoff multiplier e.g. value of 2 will double the delay
each retry
:param default response: default values to return if all tries fail
: type default response: Value or tuple
"""
def decorator_retry(function):
@wraps(function)
def function_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 0:
try:
return function(*args, **kwargs)
except ExceptionToCheck:
print("ALMEO2490: Error occured in %s,'Retrying in %d seconds" %(function.__name__, mdelay))
time.sleep(mdelay)
mtries -= 1
mdelay += backoff
raise Exception('maximum number of retries [%d] reached\nfunction: %s\nmodule: %s' \
% (tries, function.__name__,function.__module__))
return function_retry # true decorator
return decorator_retry
# function for handle exception
def do_handle_exceptions(ExceptionToCheck = Exception, default_response = None):
""" error calling the decorated function using defaults for return
original from:
https://www.3resource.com/python-excercises/decorator/python-decorator-exercise-9.php
several lines of code have been modified for functionality
:param ExceptionToCheck: the exception to check. may be a tuple of
exceptions to check
:type ExceptionToCheck: Exception or tuple
:param default response: default values to return if all tries fail
: type default response: Value or tuple
"""
def exception_decorator(function):
def function_handle_exception(*args, **kwargs):
try:
return function(*args, **kwargs)
except ExceptionToCheck as e:
print('ALMEO2490: ' + str(type(e).__name__) + " occured in:\nfunction: %s\nmodule: %s" \
% (function.__name__,function.__module__))
print(traceback.print_tb(e.__traceback__))
return default_response
return function_handle_exception
return exception_decorator
class almemo2490:
# constructor of class
def __init__(self, ip = '192.168.115.44', timeout = 10):
self.tn = None
# set class variabel with ip adress given in parameter list of constructor
self.ip_adress = ip
# set class variable with timeout value given in parameter list of constructor
self.timeout = timeout
# open telenet connection to ALMEMO 2490
self.tn = self.open_connection()
# create data frame for measurment buffer to store parameter and measured
# properties of connected sensors to ALMEMO 2490
self.meas_buffer = pd.DataFrame(columns = ['meas_date','meas_time', 'sens_port', \
'sens_channel', 'meas_val', 'channel_unit', \
'channel_name'])
"""
##### commands are echoed. Will cause conflicts in communication and there it is obmitted ####
# write function to send commands to ALMEMO 2490 that do not send a
# response
def write(self, cmd_str, wait_time = 0.5):
# send command to ALMEMO 2490
self.tn.write(bytes(cmd_str, encoding = 'utf8') + b'\r')
# wait
time.sleep(wait_time)
"""
@err_hdl.do_retry(tries=5, delay=1, backoff=3)
def open_connection(self):
return Telnet(self.ip_adress, 10001, self.timeout)
# query function to send a command to almemo 2490 and
# return the response to the command
def query(self, cmd_str, wait_time = 1):
# send request to ALMEMO 2490 encode UTF8 with CR
self.tn.write(bytes(cmd_str, encoding = 'utf8') + b'\r')
# wait
time.sleep(wait_time)
# read_buffer = self.tn.read_very_eager()
# read buffer until END OF TEXT character has been reach or timeout has occured
read_buffer = self.tn.read_until(b'\x03',self.timeout)
# decode receivved data with CP437 format
decoded_read_buffer = read_buffer.decode('cp437')
# return received data
return decoded_read_buffer
# method to set date of ALMEMO 2490
@err_hdl.do_handle_exceptions()
def set_date(self):
# request date from operating system
# convert requested date to string
# date format: ddmmyy (d: day, m: month, y: year, last 2 digits )
date = datetime.strftime(datetime.now(),"%d%m%y")
# cmd string for setting date of device
cmd_str = 'd' + date
# send command for setting date to device
self.query(cmd_str)
# method to set time of ALMEMO 2490
@err_hdl.do_handle_exceptions()
def set_time(self):
# request date from operating system
#convert requested time to strinf
# time format: hhmmss (h:hour, m: minute, s: second)
time = datetime.strftime(datetime.now(), "%H%M%S")
# cmd string for setting time of device
cmd_str = 'U' + time
# send command for setting date to device
self.query(cmd_str)
# method to get actual date from ALMEMO 2490
@err_hdl.do_handle_exceptions(default_response = "01.01.70")
def get_date(self):
# request date from ALMEMO 2490
read_buffer = self.query('P13')
# search date string in reply to command P13
# date alway has the following format: xx.xx.xx where x is within [0-9]
# get time out of read buffer and strip white space characters
date = re.search(r'([0-9]{2}\.[0-9]{2}\.[0-9]{2})', read_buffer).group(1)
return date
# method to get actual actual time from ALMEMO 2490
@err_hdl.do_handle_exceptions(default_response = "00:00:00")
def get_time(self):
# request time from ALMEMO 2490
read_buffer = self.query('P10')
# search time string in reply to command P10
# time alway has the following format: xx:xx:xx where x cis within [0-9]
# get time out of read buffer and strip white space characters
time = re.search(r'([0-9]{2}:[0-9]{2}:[0-9]{2})', read_buffer).group(1)
return time
# method to set channel name of selected channel at ALMEMO 2490
# channel name can be at max 10 characters
# not allowed characters in name are white space charcters and semicolon
# numbmer format: 'xy' where x has to be [0-9] and y has be [0-1]
@err_hdl.do_handle_exceptions()
def set_channel_name(self, channel, channel_name):
# select channel for modification
self.query('M' + channel)
# send first part of command for changing channel name
self.query('f2')
# send second part of command for changing channel name + channel name + CR
self.query('$' + channel_name + '\r')
# call method for requisting channel name from ALMEMO 2490
read_buffer = self.get_channel_name(channel)
# compare requested channel name an channel name from parameter list
# if non equal print error message
if read_buffer.strip() != channel_name:
print('setting of channel name has failed')
# method to get channel name from selected channel at ALMEMO 2490
# numbmer format: 'xy' where x has to be [0-9] and y has be [0-1]
@err_hdl.do_handle_exceptions(default_response = "N/A")
def get_channel_name(self, channel):
# select channel for request of channel name
self.query('M' + channel)
# send command for requesting channel name to ALMEMO 2490
read_buffer = self.query('P00')
# search channel name in reply to cammand P00
# name follows after 2$\ in read_buffer
# get name out of read buffer
channel_name = re.search(r'([^ ]+ +\r)', read_buffer).group(1)
return channel_name
# method to request the list of all active measurement channels of all sensors
# that are connected to ALMEMO 2490
# several parameters (channel number, meas_port, channel_unit, channel_name) are
# collected during request an stored in the meas buffer data frame of python class
@err_hdl.do_retry(tries=4, delay=3, backoff=3)
def request_sens_channel_list(self):
self.set_meas_output_format('N0')
# send command for requesting the list of all active meas channels of all sensors
# that are connected to ALMEMO 2490
read_buffer = self.query('P15')
# find channel entries in read buffer and add them to a list
# a channel entry starts with M\ and ends with CR
channel_entries = re.findall(r'([0-9]{2}:DIGI.+\r)', read_buffer)
# to get fixed parameter (channel number, measurment name, measurment unit),
# of a channel entry out of the read buffer search patterns are needed to identify them
search_patterns = {'sens_channel': r'([0-9]{2})', \
'channel_unit': r'( [^ ]{2})', 'channel_name': r'([^ ]+ +\r)'}
# the search pattern contains identification string for the search in front of the
# measurment parameter of interest and has to be removed -> strip pattern
strip_patterns = {'sens_channel': '', 'channel_unit': ' ', \
'channel_name': ' \r'}
# dictonary where parameter of a channel are temporary store before they are added
# to the measurement buffer of the class
channel_params = {'sens_channel': '', 'channel_unit': '', 'channel_name': ' ', 'sens_port': ''}
for element in channel_entries:
for key in search_patterns:
# for an channel entry in read buffer parameter is searched in the string,
# and the stip pattern is removed
# result of search and strip is stored in dictonary channel params
channel_params.update({key: re.search(search_patterns[key] ,element).group(1)})
channel_params.update({key: channel_params[key].strip(strip_patterns[key])})
# with the information of sens_channel the sensor port is determined
sens_port = channel_params['sens_channel'][1]
channel_params.update({'sens_port': 'M' + sens_port})
# add a row in measurement buffer an fill colums in measurement buffer with
# collected parameter
self.meas_buffer = self.meas_buffer.append(channel_params, ignore_index= True)
self.set_meas_output_format('N2')
# method to request all measument values for all measurement channels of all sensors
# that are connected to ALMEMO 2490
# if request fails meas_buffer of class is not updated
@err_hdl.do_retry(tries=10, delay=3, backoff=3)
def request_meas_vals_all_channels(self, wait_time = 0.5):
# request measured value from all measurment channels of all connected sensor
# to ALMEMO 2490
read_buffer = self.query('S1', wait_time)
# search for line in measurment buffer that start with a 2 numbers
meas_vals_string = re.search(r'([0-9]{2}[^\s]+)',read_buffer).group(1)
# change decimal designator to point if not default
mod_meas_vals_string = meas_vals_string.replace(',','.')
# ALMEMO 2490 returns date, time and the measured values for all
# sensor channels of all connceted sensors.
# parameters are separated by semicolon, splitted and stored in list format
meas_param_list = re.findall(r'[^;"]+', mod_meas_vals_string)
# extraction of date from meas_param_list
date = meas_param_list.pop(0)
date = date.strip('\"')
# extraction of time from meas_param_list
time = meas_param_list.pop(0)
time = time.strip('\"')
# check if extracted date and time have correct format
datetime.strptime(date, '%d.%m.%y')
datetime.strptime(time, '%H:%M:%S')
# store date and time of measurement in meas_buffer of class
self.meas_buffer.loc[:,'meas_date'] = date
self.meas_buffer.loc[:,'meas_time'] = time
# after exctraction of date and time meas_vals_list does only contain
# the measured mavules for all sensor channels
# convert the measured values now to float
meas_vals_list = np.array(meas_param_list, dtype = 'float')
# sore measured values for all sensor channels in measurment buffer of class
self.meas_buffer.loc[:, 'meas_val'] = meas_vals_list
# method to set the ouput format for the measured data
# default option and the only one at ALMEMO 2490 is
# output in table format separated by semicolon
# S1 for request replies the following format:
# date;time;meas value channel 1;meas vale channel 2; ...;meas value channel N
@err_hdl.do_handle_exceptions()
def set_meas_output_format(self, output_format = 'N2'):
# send command to set the output format the measured data
self.query (output_format)
# method to the the conversion rate (Measurements per Second)
@err_hdl.do_handle_exceptions()
def set_conversion_rate(self, rate = 10):
# create as dictonary with the possible conversion rates and the
# scan time in seconds for the conversion rate
scan_time_dict = dict({10: 0.9, 50: 0.18, 100: 0.09})
# send command for setting conversion rate to ALMEMO 2490
self.query(int(rate))
# fetch scan time for selected conversion rate out of dictonary
scan_time = scan_time_dict.get(int(rate))
return scan_time
# method to fetch parameter for selected channel from measrement buffer of class
# selection possible by channel number of sensor channel or channel name
# if more than one row of channel buffer fits entry can be selected by optional parameter item
@err_hdl.do_handle_exceptions(default_response = (pd.Series(), 0))
def fetch_channel_param_from_meas_buffer(self, pattern = '00', index = 0):
# filter rows of data frame that match to selection
# results will be a dataframe that contains all matches
sel_channel_data = self.meas_buffer[self.meas_buffer.isin([pattern]).any(axis=1)]
# if no entry for selection of sensor channel is found in meas buffer of class,
# sensor channel '0.0' will be selected as default parameter
if sel_channel_data.empty == True:
sel_channel_data = self.meas_buffer[self.meas_buffer.isin(['00']).any(axis=1)]
# info message is printed that no channel matches to selection
print('value is not in data_frame. Default value (channel 00) is returned')
# determine number of matches
num_matches = sel_channel_data.shape[0]
# check if selected entry of filtered measured buffer smaller than number of matches
# max index is always number of matches -1
if index < num_matches:
meas_channel_data = sel_channel_data.iloc[index,:]
else:
# determine the maximum value for index
# max index = number of matches -1
max_item_val = num_matches - 1
# return default value which is the fisrt one in the filtered meas_buffer
# print error message that selected index is too high
meas_channel_data = sel_channel_data.iloc[0,:]
print('Value for index too large. Set Value to Default (index = 0)\n')
print('max index: %d' % max_item_val)
# data series with selected channel is returned containing all parameter of sensor channel
# if more entires match to selection data series that should be returned can be selected
# by optinal parameter item. By default item is set to 0
# second parameter that is returned by the method is the number of matches
# that fit to selection criteria
return meas_channel_data, num_matches
# close telenet connection
def close(self):
if self.tn is not None:
self.tn.close()
def __exit__(self):
self.close()
# destructor of class
def __del__(self):
self.close()