diff --git a/Python_script/almemo710.py b/Python_script/almemo710.py new file mode 100644 index 0000000000000000000000000000000000000000..63b8340f87d1a7ff55ace4a3b21534ac77efd56a --- /dev/null +++ b/Python_script/almemo710.py @@ -0,0 +1,430 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Jan 2 11:08:12 2023 + +@author: michael pawelzik +Some parts have been copied from uros mavric and then they have been modified for compatibility. + +#### SYNTAX FOR CALLING CLASS OBJECTS ######################################## + +Look into module play_with_almemo710.py + +############################################################################## +""" + + +# import pythod modules +import time +from telnetlib import Telnet +import re +from datetime import datetime +import pandas as pd +import numpy as np +import socket +import traceback + + +class almemo710: + + + # constructor of class + def __init__(self, ip = '192.168.115.94', timeout = 10): + + # set class variabel with ip adress given in parameter list of constructor + self.ip_adress = ip + + # set class variable with timeout value given in parameter list of constructor + self.timeout = timeout + + # error handling if device is not acessable on network + # raise Keyboard Interrupt if connecting fails + try: + # open telenet connection to ALMEMO 710 + self.tn = Telnet(self.ip_adress, 10001, timeout) + + except socket.time: + print("Exception occurred accessing ALMEMO 710 via Telnet\n") + raise KeyboardInterrupt + + # create data frame for measurment buffer to store parameter and measured + # properties of connected sensors to ALMEMO 710 + self.meas_buffer = pd.DataFrame(columns = ['meas_date','meas_time', 'sens_port', \ + 'sens_channel', 'meas_val', 'channel_unit', \ + 'channel_name']) + + + + + # write function to send commands to ALMEMO 710 that do not send a + # response + def write(self, cmd_str, wait_time = 0.5): + + # send command to ALMEMO 710 + self.tn.write(bytes(cmd_str, encoding = 'utf8') + b'\r') + + # wait + time.sleep(wait_time) + + + # query function to send a command to almemo 710 and + # return the response to the command + def query(self, cmd_str, wait_time = 0.5): + + # send request to ALMEMO 710 encode UTF8 with CR + self.tn.write(bytes(cmd_str, encoding = 'utf8') + b'\r') + + # wait + time.sleep(wait_time) + + # read_buffer = self.tn.read_very_eager() + # read buffer until END OF TEXT character has been reach or timeout has occured + read_buffer = self.tn.read_until(b'\x03',self.timeout) + + # decode receivved data with CP437 format + decoded_read_buffer = read_buffer.decode('cp437') + + # return received data + return decoded_read_buffer + + + # method to set date of ALMEMO 710 + def set_date(self): + + # request date from operating system + # convert requested date to string + # date format: ddmmyy (d: day, m: month, y: year, last 2 digits ) + date = datetime.strftime(datetime.now(),"%d%m%y") + + # cmd string for setting date of device + cmd_str = 'd' + date + + # send command for setting date to device + self.query(cmd_str) + + + + # method to set time of ALMEMO 710 + def set_time(self): + + # request date from operating system + #convert requested time to strinf + # time format: hhmmss (h:hour, m: minute, s: second) + time = datetime.strftime(datetime.now(), "%H%M%S") + + # cmd string for setting time of device + cmd_str = 'U' + time + + # send command for setting date to device + self.query(cmd_str) + + + # method to get actual date from ALMEMO 710 + def get_date(self): + + # request date from ALMEMO 710 + read_buffer = self.query('P13') + + # search date string in reply to command P13 + # date alway has the following format: xx.xx.xx where x is within [0-9] + # get time out of read buffer and strip white space characters + date = re.search(r'([0-9]{2}\.[0-9]{2}\.[0-9]{2})', read_buffer).group(1) + + return date + + # method to get actual actual time from ALMEMO 710 + def get_time(self): + + # request time from ALMEMO 710 + read_buffer = self.query('P10') + + # search time string in reply to command P10 + # time alway has the following format: xx:xx:xx where x cis within [0-9] + # get time out of read buffer and strip white space characters + time = re.search(r'([0-9]{2}:[0-9]{2}:[0-9]{2})', read_buffer).group(1) + + return time + + # method to set channel name of selected channel at ALMEMO 710 + # channel name can be at max 10 characters + # not allowed characters in name are white space charcters and semicolon + # channel numbers <= 9: string, format 'x.x' where x has to be within [0-9] + # channel numbers > 9: string, format 'x.xx' where x has to be within [0-9] + def set_channel_name(self, channel, channel_name): + + # select channel for modification + self.query('M' + channel) + + # send first part of command for changing channel name + self.query('f2') + + # send second part of command for changing channel name + channel name + CR + self.query('$' + channel_name + '\r') + + # call method for requisting channel name from ALMEMO 710 + read_buffer = self.get_channel_name(channel) + + # compare requested channel name an channel name from parameter list + # if non equal print error message + if read_buffer.strip() != channel_name: + print('setting of channel name has failed') + + + # method to get channel name from selected channel at ALMEMO 710 + # channel numbers <= 9: string, format 'x.x' where x has to be within [0-9] + # channel numbers > 9: string, format 'x.xx' where x has to be within [0-9] + def get_channel_name(self, channel): + + # select channel for request of channel name + self.query('M' + channel) + + # send command for requesting channel name to ALMEMO 710 + read_buffer = self.query('P00') + + # search channel name in reply to cammand P00 + # name follows after 2$\ in read_buffer + # get name out of read buffer + channel_name = re.search(r'(2\$\\[^;]+)', read_buffer).group(1) + + # strip white space characters + return channel_name.strip(r'(2\$\\)') + + + # method to request the list of all active measurement channels of all sensors + # that are connected to ALMEMO 710 + # several parameters (channel number, meas_port, channel_unit, channel_name) are + # collected during request an stored in the meas buffer data frame of python class + def request_sens_channel_list(self): + + # send command for requesting the list of all active meas channels of all sensors + # that are connected to ALMEMO 710 + read_buffer = self.query('P15') + + # find channel entries in read buffer and add them to a list + # a channel entry starts with M\ and ends with CR + channel_entries = re.findall(r'(M\\.+\r)', read_buffer) + + # to get fixed parameter (channel number, measurment name, measurment unit), + # of a channel entry out of the read buffer search patterns are needed to identify them + # pattern for sens_channel: M\x.x or M\x.xx where x within [0-9] + # pattern for channel_unit: 1$\xx where x can be every character + # pattern for channel_name: 2$\xx where x can be every character without semicolon + search_patterns = {'sens_channel': r'(M\\[0-9]+\.{1}[0-9]+)', \ + 'channel_unit': r'(1\$\\.{2})', 'channel_name': r'(2\$\\[^;]*)'} + + # the search pattern contains identification string for the search in front of the + # measurment parameter of interest and has to be removed -> strip pattern + strip_patterns = {'sens_channel': r'(M\\)', 'channel_unit': r'(1\$\\)', \ + 'channel_name': r'(2\$\\)'} + + # dictonary where parameter of a channel are temporary store before they are added + # to the measurement buffer of the class + channel_params = {'sens_channel': '', 'channel_unit': '', 'channel_name': '', 'sens_port': ''} + + + for element in channel_entries: + + for key in search_patterns: + + try: + # for an channel entry in read buffer parameter is searched in the string, + # and the stip pattern is removed + # result of search and strip is stored in dictonary channel params + channel_params.update({key: re.search(search_patterns[key] ,element).group(1)}) + channel_params.update({key: channel_params[key].strip(strip_patterns[key])}) + + # if search of a parameter fails, default value is set + # default value: 'N/A' + # an error message is printed that contains relevant data + # loop will be continued + except AttributeError as attr_err: + + channel_params.update({key: 'N/A'}) + + print(read_buffer) + print('An error occured during parameter search.' \ + 'Parameter is set to default (N/A)\n') + print(traceback.print_tb(attr_err.__traceback__)) + + # with the information of sens_channel the sensor port is determined + # sens_port = 'M' + numbers in front of the point of the measurment channel + # example: sens_channel = 0.1 -> sens_port = M0 + sens_port, _ = channel_params['sens_channel'].split('.') + channel_params.update({'sens_port': 'M' + sens_port}) + + # add a row in measurement buffer an fill colums in measurement buffer with + # collected parameter + self.meas_buffer = self.meas_buffer.append(channel_params, ignore_index= True) + + + # method to request all measument values for all measurement channels of all sensors + # that are connected to ALMEMO 710 + # if request fails meas_buffer of class is not updated + def request_meas_vals_all_channels(self, wait_time = 0.5): + + # request measured value from all measurment channels of all connected sensor + # to ALMEMO 710 + read_buffer = self.query('S1', wait_time) + + try: + + # search for line in measurment buffer that starts with digit and ends with digit + meas_vals_string = re.search(r'([0-9].+[0-9])',read_buffer).group(1) + + # change decimal designator to point if not default + mod_meas_vals_string = meas_vals_string.replace(',','.') + + # ALMEMO 710 returns date, time and the measured values for all + # sensor channels of all connceted sensors. + # parameters are separated by semicolon, splitted and stored in list format + meas_param_list = mod_meas_vals_string.split(';') + + # extraction of date from meas_param_list + date = meas_param_list.pop(0) + + # extraction of time from meas_param_list + time = meas_param_list.pop(0) + + + # check if extracted date and time have correct format + datetime.strptime(date, '%d.%m.%y') + datetime.strptime(time, '%H:%M:%S') + + # store date and time of measurement in meas_buffer of class + self.meas_buffer.loc[:, 'meas_date'] = date + self.meas_buffer.loc[:, 'meas_time'] = time + + + # after exctraction of date and time meas_vals_list does only contain + # the measured mavules for all sensor channels + # convert the measured values now to float + meas_vals_list = np.array(meas_param_list, dtype = 'float') + + # sore measured values for all sensor channels in measurment buffer of class + self.meas_buffer.loc[:, 'meas_val'] = meas_vals_list + + # if no line with measured values is found and error message is + # printed that contains relevant data + except AttributeError as att_err: + + print(read_buffer) + print('An error occured during search of meas_vals_string.\n') + print(traceback.print_tb(att_err.__traceback__)) + + # if list of measured values is empty an error message is + # printed that contains relevant data + except IndexError as ind_err: + + print(read_buffer) + print('List of measured values is empty. Date/time could not' \ + 'be extracted.\n') + print(traceback.print_tb(ind_err.__traceback__)) + + except ValueError as val_err: + + print(read_buffer) + + # determine source code which has caused error + val_err_initiator = traceback.extract_tb(val_err.__traceback__, limit=None)[0].line + + # check for date format error + if 'date' in val_err_initiator: + print('String does not contain correct format for date\n') + + # check for time format error and print error message that time format is incorrect + elif 'time' in val_err_initiator: + print('String does not contain correct format for time\n') + + # print error message that mesured values contain parameter that can not + # be converted to float + else: + print('List with measured values contain parameter that could ' \ + 'not be converted to float\n') + + # print error message with relevant error data + print(traceback.print_tb(val_err.__traceback__)) + + + + # method to set the ouput format for the measured data + # default option and the only one at ALMEMO 710 is + # output in table format separated by semicolon + # S1 for request replies the following format: + # date;time;meas value channel 1;meas vale channel 2; ...;meas value channel N + def set_meas_output_format(self, output_format = 'N2'): + + # send command to set the output format the measured data + self.query (output_format) + + # method to the the conversion rate (Measurements per Second) + def set_conversion_rate(self, rate = 10): + + # create as dictonary with the possible conversion rates and the + # scan time in seconds for the conversion rate + scan_time_dict = dict({10: 0.9, 50: 0.18, 100: 0.09}) + + # send command for setting conversion rate to ALMEMO 710 + self.query(int(rate)) + + # fetch scan time for selected conversion rate out of dictonary + scan_time = scan_time_dict.get(int(rate)) + + return scan_time + + # method to fetch parameter for selected channel from measrement buffer of class + # selection possible by channel number of sensor channel or channel name + # if more than one row of channel buffer fits entry can be selected by optional parameter item + def fetch_channel_param_from_meas_buffer(self, pattern = '0.0', index = 0): + + # filter rows of data frame that match to selection + # results will be a dataframe that contains all matches + sel_channel_data = self.meas_buffer[self.meas_buffer.isin([pattern]).any(axis=1)] + + # if no entry for selection of sensor channel is found in meas buffer of class, + # sensor channel '0.0' will be selected as default parameter + if sel_channel_data.empty == True: + + sel_channel_data = self.meas_buffer[self.meas_buffer.isin(['0.0']).any(axis=1)] + + # info message is printed that no channel matches to selection + print('value is not in data_frame. Default value (channel 0.0) is returned') + + # determine number of matches + num_matches = sel_channel_data.shape[0] + + # check if selected entry of filtered measured buffer smaller than number of matches + # max index is always number of matches -1 + if index < num_matches: + meas_channel_data = sel_channel_data.iloc[index,:] + + else: + + # determine the maximum value for index + # max index = number of matches -1 + max_item_val = num_matches - 1 + + # return default value which is the fisrt one in the filtered meas_buffer + # print error message that selected index is too high + meas_channel_data = sel_channel_data.iloc[0,:] + print('Value for index too large. Set Value to Default (index = 0)\n') + print('max index: %d' % max_item_val) + + # data series with selected channel is returned containing all parameter of sensor channel + # if more entires match to selection data series that should be returned can be selected + # by optinal parameter item. By default item is set to 0 + # second parameter that is returned by the method is the number of matches + # that fit to selection criteria + return meas_channel_data, num_matches + + + # close telenet connection + def close(self): + self.tn.close() + + # destructor of class + def __del__(self): + self.close() + + + + + \ No newline at end of file diff --git a/Python_script/play_with_almemo710.py b/Python_script/play_with_almemo710.py new file mode 100644 index 0000000000000000000000000000000000000000..d938ab5f9475da35130270c84ea4fa2507a55899 --- /dev/null +++ b/Python_script/play_with_almemo710.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Jan 2 16:55:57 2023 + +@author: michael pawelzik +""" + +# import module with python class for ahlborn +import almemo710 + +# variables for selection in used methods +sel_channel_no = '0.0' +sel_channel_name = 'T,t' +item = 0 + +try: + + # create object for python class of almemo710 + almemo710_obj = almemo710.almemo710(ip='192.168.115.94') + + # request date from almemo 710 + date = almemo710_obj.get_date() + + # request time from almemo 710 + time = almemo710_obj.get_time() + + # set name of measurment channel 0.1 from almemo 710 + almemo710_obj.set_channel_name(sel_channel_no,sel_channel_name) + + # request name of measurement channel from almemo 710 + channel_name = almemo710_obj.get_channel_name(sel_channel_no) + + # set output format for measured data + # parameter should be N2 or left empty + # output format is ste to table + almemo710_obj. set_meas_output_format() + + # request list of all sensors channels that are currently active + # store this data in data frame of class_object + almemo710_obj.request_sens_channel_list() + + # trigger single measurement an request the measured values for all measurment channels + # of almemo 710 + # store measured values in data frame of python class + almemo710_obj.request_meas_vals_all_channels() + + # get measuerement buffer of class object + meas_buffer = almemo710_obj.meas_buffer + + # fetch parameter of selected measurement channel from meas_buffer of class object + # input of channel name or channel number is possible + # numbmer format: 'x.x' or x.xx when number > 10 where is has to be [0-9] + meas_channel_data, num_matches = almemo710_obj. \ + fetch_channel_param_from_meas_buffer(pattern = sel_channel_name, index = item) + + # get measurement date from selected measurement channel + meas_date = meas_channel_data.meas_date + + # get measurement time from selected measurement channel + meas_time = meas_channel_data.meas_time + + # get measurement value from selected measurement channel + meas_value = meas_channel_data.meas_val + + # get sensor unit from selected measurement channel + channel_unit = meas_channel_data.channel_unit + + # get channel name from selected measurement channel + channel_name = meas_channel_data.channel_name + + # get channel number from selected measurement channel + sens_channel = meas_channel_data.sens_channel + +finally: + + # close telnet connection + almemo710_obj.close() +