Skip to content
Snippets Groups Projects
almemo2490.py 18.57 KiB
# -*- coding: utf-8 -*-
"""
Created on Mon Jan  2 11:08:12 2023

@author: michael pawelzik
Some parts have been copied from uros mavric and then they have been modified for compatibility. 

#### SYNTAX FOR CALLING CLASS OBJECTS ########################################

Look into module play_with_almemo2490.py

##############################################################################
"""


# import pythod modules
import time
from telnetlib import Telnet
import re
from datetime import datetime
import pandas as pd
import numpy as np
import traceback
from functools import wraps


# python class for error handling of almemo 2490
class err_hdl(Exception):
    
    # function for retry decorate object in case of exception
    def do_retry(ExceptionToCheck = Exception, tries=4, delay=3, backoff=2, \
                 default_response = None):
        """Retry calling the decorated function using an backoff and defaults for return
    
        original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
        several lines of code have been modified for functionality
        
        :param ExceptionToCheck: the exception to check. may be a tuple of
            exceptions to check
        :type ExceptionToCheck: Exception or tuple
        :param tries: number of times to try (not retry) before giving up
        :type tries: int
        :param delay: initial delay between retries in seconds
        :type delay: int
        :param backoff: backoff multiplier e.g. value of 2 will double the delay
            each retry
        :param default response: default values to return if all tries fail
        : type default response: Value or tuple
        
        """
        def decorator_retry(function):
    
            @wraps(function)
            def function_retry(*args, **kwargs):
                
                mtries, mdelay = tries, delay
                while mtries > 0:
                    try:
                        return function(*args, **kwargs)
                    except ExceptionToCheck:
                        print("ALMEO2490: Error occured in %s,'Retrying in %d seconds" %(function.__name__, mdelay))
                        time.sleep(mdelay)
                        mtries -= 1
                        mdelay += backoff
                raise Exception('maximum number of retries [%d] reached\nfunction: %s\nmodule: %s' \
                      % (tries, function.__name__,function.__module__)) 
                
            return function_retry # true decorator
            
        return decorator_retry
    
    # function for handle exception
    def do_handle_exceptions(ExceptionToCheck = Exception, default_response = None):
        
        """ error calling the decorated function using defaults for return
        original from: 
        https://www.3resource.com/python-excercises/decorator/python-decorator-exercise-9.php
        
        several lines of code have been modified for functionality
        :param ExceptionToCheck: the exception to check. may be a tuple of
            exceptions to check
        :type ExceptionToCheck: Exception or tuple    
        :param default response: default values to return if all tries fail
        : type default response: Value or tuple
            
        """
        def exception_decorator(function):
            
            def function_handle_exception(*args, **kwargs):
                
                try:
                    return function(*args, **kwargs)
                
                except ExceptionToCheck as e:
                    
                    print('ALMEO2490: ' + str(type(e).__name__) + " occured in:\nfunction: %s\nmodule: %s" \
                          % (function.__name__,function.__module__))
                    print(traceback.print_tb(e.__traceback__))
    
                    return default_response
    
            return function_handle_exception
        
        return exception_decorator


class almemo2490:
   
    
    # constructor of class
    def __init__(self, ip = '192.168.115.44', timeout = 10):
        self.tn = None
        
        # set class variabel with ip adress given in parameter list of constructor
        self.ip_adress = ip 
        
        # set class variable with timeout value given in parameter list of constructor
        self.timeout = timeout
             
        # open telenet connection to ALMEMO 2490
        self.tn = self.open_connection()

        # create data frame for measurment buffer to store parameter and measured
        # properties of connected sensors to ALMEMO 2490
        self.meas_buffer = pd.DataFrame(columns = ['meas_date','meas_time', 'sens_port', \
                                                   'sens_channel', 'meas_val', 'channel_unit', \
                                                   'channel_name'])
   
   

    """
    ##### commands are echoed. Will cause conflicts in communication and there it is obmitted #### 
    # write function to send commands to ALMEMO 2490 that do not send a
    # response
    def write(self, cmd_str, wait_time = 0.5):
     
        # send command to ALMEMO 2490
        self.tn.write(bytes(cmd_str, encoding = 'utf8') + b'\r')
     
        # wait
        time.sleep(wait_time)
    """ 
    @err_hdl.do_retry(tries=5, delay=1, backoff=3)
    def open_connection(self):
        
        return Telnet(self.ip_adress, 10001, self.timeout)
        
    
    # query function to send a command to almemo 2490 and
    # return the response to the command
 
    def query(self, cmd_str, wait_time = 1):
        
        # send request to ALMEMO 2490 encode UTF8 with CR
        self.tn.write(bytes(cmd_str, encoding = 'utf8') + b'\r')
        
        # wait
        time.sleep(wait_time)
        
        # read_buffer = self.tn.read_very_eager()
        # read buffer until END OF TEXT character has been reach or timeout has occured
        read_buffer = self.tn.read_until(b'\x03',self.timeout)
        
        # decode receivved data with CP437 format
        decoded_read_buffer = read_buffer.decode('cp437')
        
        
        # return received data
        return decoded_read_buffer
   

   # method to set date of ALMEMO 2490
    @err_hdl.do_handle_exceptions()
    def set_date(self):
        
       # request date from operating system
       # convert requested date to string
       # date format: ddmmyy (d: day, m: month, y: year, last 2 digits )
       date = datetime.strftime(datetime.now(),"%d%m%y")
        
       # cmd string for setting date of device
       cmd_str = 'd' + date
       
       # send command for setting date to device 
       self.query(cmd_str)
       
   
    
    # method to set time of ALMEMO 2490
    @err_hdl.do_handle_exceptions()
    def set_time(self):
       
        # request date from operating system
        #convert requested time to strinf
        # time format: hhmmss (h:hour, m: minute, s: second)
       time = datetime.strftime(datetime.now(), "%H%M%S")
       
       # cmd string for setting time of device
       cmd_str = 'U' + time
       
       # send command for setting date to device
       self.query(cmd_str)
       
    
    # method to get actual date from ALMEMO 2490 
    @err_hdl.do_handle_exceptions(default_response = "01.01.70")
    def get_date(self):
        
        # request date from ALMEMO 2490
        read_buffer = self.query('P13')
        
        # search date string in reply to command P13 
        # date alway has the following format: xx.xx.xx where x is within [0-9]
        # get time out of read buffer and strip white space characters

        date = re.search(r'([0-9]{2}\.[0-9]{2}\.[0-9]{2})', read_buffer).group(1)
        
        return date
    
    # method to get actual actual time from ALMEMO 2490
    @err_hdl.do_handle_exceptions(default_response = "00:00:00")
    def get_time(self):
        
        # request time from ALMEMO 2490
        read_buffer = self.query('P10')
        
        # search time string in reply to command P10 
        # time alway has the following format: xx:xx:xx where x cis within [0-9]
        # get time out of read buffer and strip white space characters
        time = re.search(r'([0-9]{2}:[0-9]{2}:[0-9]{2})', read_buffer).group(1)
        
        return time
    
    # method to set channel name of selected channel at ALMEMO 2490
    # channel name can be at max 10 characters
    # not allowed characters in name are white space charcters and semicolon
    # numbmer format: 'xy'  where x has to be [0-9] and y has be [0-1]
    @err_hdl.do_handle_exceptions()
    def set_channel_name(self, channel, channel_name):
        
        # select channel for modification 
        self.query('M' + channel)
               
        # send first part of command for changing channel name
        self.query('f2')
        
        # send second part of command for changing channel name + channel name + CR
        self.query('$' + channel_name + '\r')
        
        # call  method for requisting channel name from ALMEMO 2490
        read_buffer = self.get_channel_name(channel)
        
        # compare requested channel name an channel name from parameter list
        # if non equal print error message 
        if read_buffer.strip() != channel_name:
            print('setting of channel name has failed')
        
        
    # method to get channel name from selected channel at ALMEMO 2490
    # numbmer format: 'xy'  where x has to be [0-9] and y has be [0-1]
    @err_hdl.do_handle_exceptions(default_response = "N/A")
    def get_channel_name(self, channel):
        
        # select channel for request of channel name
        self.query('M' + channel)
        
        # send command for requesting channel name to ALMEMO 2490
        read_buffer = self.query('P00')
        
        
        # search channel name in reply to cammand P00
        # name follows after 2$\ in read_buffer
        # get name out of read buffer 
            
        channel_name = re.search(r'([^ ]+ +\r)', read_buffer).group(1)
 
        return channel_name
    
    
    # method to request the list of all active measurement  channels of all sensors
    # that are connected to ALMEMO 2490
    # several parameters (channel number, meas_port, channel_unit, channel_name) are
    # collected during request an stored in the meas buffer data frame of python class
    @err_hdl.do_retry(tries=4, delay=3, backoff=3)
    def request_sens_channel_list(self):
        
        self.set_meas_output_format('N0')
        
        # send command for requesting the list of all active meas channels of all sensors
        # that are connected to ALMEMO 2490
        read_buffer = self.query('P15')
        
        # find channel entries in read buffer and add them to a list
        # a channel entry starts with M\ and ends with CR
        channel_entries = re.findall(r'([0-9]{2}:DIGI.+\r)', read_buffer)
        
        # to get fixed parameter (channel number, measurment name, measurment unit),
        # of a channel entry out of the read buffer search patterns are needed to identify them 
    
        search_patterns = {'sens_channel': r'([0-9]{2})', \
                               'channel_unit': r'( [^ ]{2})', 'channel_name': r'([^ ]+ +\r)'}
        
        # the search pattern contains identification string for the search in front of the
        # measurment parameter of interest and has to be removed -> strip pattern
        strip_patterns = {'sens_channel': '', 'channel_unit': ' ', \
                              'channel_name': ' \r'}            
        
        # dictonary where parameter of a channel are temporary store before they are added
        # to the measurement buffer of the class
        channel_params = {'sens_channel': '', 'channel_unit': '', 'channel_name': ' ', 'sens_port': ''}
            
        
        for element in channel_entries:
            
            for key in search_patterns:
                
               
                # for an channel entry in read buffer parameter is searched in the string,
                # and the stip pattern is removed
                # result of search and strip is stored in dictonary channel params 
                channel_params.update({key: re.search(search_patterns[key] ,element).group(1)})
                channel_params.update({key: channel_params[key].strip(strip_patterns[key])})
                
            
            # with the information of sens_channel the sensor port is determined
       
            sens_port = channel_params['sens_channel'][1]
            channel_params.update({'sens_port': 'M' + sens_port})
            
            # add a row in measurement buffer an fill colums in measurement buffer with
            # collected parameter                        
            self.meas_buffer = self.meas_buffer.append(channel_params, ignore_index= True)
            
            self.set_meas_output_format('N2')
              
        
        
    # method to request all measument values for all measurement channels of all sensors
    # that are connected to ALMEMO 2490
    # if request fails meas_buffer of class is not updated
    @err_hdl.do_retry(tries=10, delay=3, backoff=3)
    def request_meas_vals_all_channels(self, wait_time = 0.5):
        
        # request measured value from all measurment channels of all connected sensor
        # to ALMEMO 2490
        read_buffer = self.query('S1', wait_time)
              
        # search for line in measurment buffer that start with a 2 numbers 
        meas_vals_string = re.search(r'([0-9]{2}[^\s]+)',read_buffer).group(1)


        # change decimal designator to point if not default 
        mod_meas_vals_string = meas_vals_string.replace(',','.')
        
        
        # ALMEMO 2490 returns date, time and the measured values for all
        # sensor channels of all connceted sensors.
        # parameters are separated by semicolon, splitted and stored in list format
        meas_param_list = re.findall(r'[^;"]+', mod_meas_vals_string)
        
        # extraction of date from meas_param_list
        date = meas_param_list.pop(0)
        date = date.strip('\"')
        
        # extraction of time from meas_param_list
        time = meas_param_list.pop(0)
        time = time.strip('\"')
            
        # check if extracted date and time have correct format
        datetime.strptime(date, '%d.%m.%y')
        datetime.strptime(time, '%H:%M:%S')
        
        # store date and time of measurement in meas_buffer of class
        self.meas_buffer.loc[:,'meas_date'] = date
        self.meas_buffer.loc[:,'meas_time'] = time
    
        
        # after exctraction of date and time meas_vals_list does only contain
        # the measured mavules for all sensor channels
        # convert the measured values now to float
        meas_vals_list = np.array(meas_param_list, dtype = 'float')
        
        # sore measured values for all sensor channels in measurment buffer of class
        self.meas_buffer.loc[:, 'meas_val'] = meas_vals_list
            
        
        
    
    # method to set the ouput format for the measured data
    # default option and the only one at ALMEMO 2490 is
    # output in table format separated by semicolon
    # S1 for request replies the following format:
    # date;time;meas value channel 1;meas vale channel 2; ...;meas value channel N 
    @err_hdl.do_handle_exceptions() 
    def set_meas_output_format(self, output_format = 'N2'):
        
        # send command to set the output format the measured data
        self.query (output_format)
        
    # method to the the conversion rate (Measurements per Second) 
    @err_hdl.do_handle_exceptions() 
    def set_conversion_rate(self, rate = 10):
        
        # create as dictonary with the possible conversion rates and the 
        # scan time in seconds for the conversion rate
        scan_time_dict = dict({10: 0.9, 50: 0.18, 100: 0.09})
        
        # send command for setting conversion rate to ALMEMO 2490
        self.query(int(rate))
        
        # fetch scan time for selected conversion rate out of dictonary 
        scan_time = scan_time_dict.get(int(rate))
        
        return scan_time
    
    # method to fetch parameter for selected channel from measrement buffer of class
    # selection possible by channel number of sensor channel or channel name
    # if more than one row of channel buffer fits entry can be selected by optional parameter item
    @err_hdl.do_handle_exceptions(default_response = (pd.Series(), 0))
    def fetch_channel_param_from_meas_buffer(self, pattern = '00', index = 0):
        
        # filter rows of data frame that match to selection
        # results will be a dataframe that contains all matches
        sel_channel_data = self.meas_buffer[self.meas_buffer.isin([pattern]).any(axis=1)] 
        
        # if no entry for selection of sensor channel is found in meas buffer of class,
        # sensor channel '0.0' will be selected as default parameter
        if sel_channel_data.empty == True:
            
            sel_channel_data = self.meas_buffer[self.meas_buffer.isin(['00']).any(axis=1)]
            
            # info message is printed that no channel matches to selection
            print('value is not in data_frame. Default value (channel 00) is returned')                                
        
        # determine number of matches
        num_matches = sel_channel_data.shape[0]
        
        # check if selected entry of filtered measured buffer smaller than number of matches
        # max index is always number of matches -1
        if index < num_matches:
            meas_channel_data = sel_channel_data.iloc[index,:]
        
        else:
            
            # determine the maximum value for index
            # max index = number of matches -1
            max_item_val = num_matches - 1
            
            # return default value which is the fisrt one in the filtered meas_buffer
            # print error message that selected index is too high
            meas_channel_data = sel_channel_data.iloc[0,:]
            print('Value for index too large. Set Value to Default (index = 0)\n')
            print('max index: %d' % max_item_val) 
        
        # data series with selected channel is returned containing all parameter of sensor channel
        # if more entires match to selection data series that should be returned can be selected
        # by optinal parameter item. By default item is set to 0
        # second parameter that is returned by the method is the number of matches
        # that fit to selection criteria
        return meas_channel_data, num_matches 
        
    
    # close telenet connection
    def close(self):
        if self.tn is not None:
            self.tn.close()
        
    def __exit__(self):
        self.close()
    
    # destructor of class
    def __del__(self):
        self.close()