# -*- coding: utf-8 -*-
Created on Mon Jan 2 11:08:12 2023
@author: michael pawelzik
Some parts have been copied from uros mavric and then they have been modified for compatibility.
#### SYNTAX FOR CALLING CLASS OBJECTS ########################################
Look into module
# import pythod modules
import time
from telnetlib import Telnet
import re
from datetime import datetime
import pandas as pd
import numpy as np
import socket
import traceback
class almemo710:
# constructor of class
def __init__(self, ip = '', timeout = 10):
# set class variabel with ip adress given in parameter list of constructor
self.ip_adress = ip
# set class variable with timeout value given in parameter list of constructor
self.timeout = timeout
# error handling if device is not acessable on network
# raise Keyboard Interrupt if connecting fails
# open telenet connection to ALMEMO 710 = Telnet(self.ip_adress, 10001, timeout)
except socket.time:
print("Exception occurred accessing ALMEMO 710 via Telnet\n")
raise KeyboardInterrupt
# create data frame for measurment buffer to store parameter and measured
# properties of connected sensors to ALMEMO 710
self.meas_buffer = pd.DataFrame(columns = ['meas_date','meas_time', 'sens_port', \
'sens_channel', 'meas_val', 'channel_unit', \
# write function to send commands to ALMEMO 710 that do not send a
# response
def write(self, cmd_str, wait_time = 0.5):
# send command to ALMEMO 710, encoding = 'utf8') + b'\r')
# wait
# query function to send a command to almemo 710 and
# return the response to the command
def query(self, cmd_str, wait_time = 0.5):
# send request to ALMEMO 710 encode UTF8 with CR, encoding = 'utf8') + b'\r')
# wait
# read_buffer =
# read buffer until END OF TEXT character has been reach or timeout has occured
read_buffer ='\x03',self.timeout)
# decode receivved data with CP437 format
decoded_read_buffer = read_buffer.decode('cp437')
# return received data
return decoded_read_buffer
# method to set date of ALMEMO 710
def set_date(self):
# request date from operating system
# convert requested date to string
# date format: ddmmyy (d: day, m: month, y: year, last 2 digits )
date = datetime.strftime(,"%d%m%y")
# cmd string for setting date of device
cmd_str = 'd' + date
# send command for setting date to device
# method to set time of ALMEMO 710
def set_time(self):
# request date from operating system
#convert requested time to strinf
# time format: hhmmss (h:hour, m: minute, s: second)
time = datetime.strftime(, "%H%M%S")
# cmd string for setting time of device
cmd_str = 'U' + time
# send command for setting date to device
# method to get actual date from ALMEMO 710
def get_date(self):
# request date from ALMEMO 710
read_buffer = self.query('P13')
# search date string in reply to command P13
# date alway has the following format: xx.xx.xx where x is within [0-9]
# get time out of read buffer and strip white space characters
date ='([0-9]{2}\.[0-9]{2}\.[0-9]{2})', read_buffer).group(1)
return date
# method to get actual actual time from ALMEMO 710
def get_time(self):
# request time from ALMEMO 710
read_buffer = self.query('P10')
# search time string in reply to command P10
# time alway has the following format: xx:xx:xx where x cis within [0-9]
# get time out of read buffer and strip white space characters
time ='([0-9]{2}:[0-9]{2}:[0-9]{2})', read_buffer).group(1)
return time
# method to set channel name of selected channel at ALMEMO 710
# channel name can be at max 10 characters
# not allowed characters in name are white space charcters and semicolon
# channel numbers <= 9: string, format 'x.x' where x has to be within [0-9]
# channel numbers > 9: string, format 'x.xx' where x has to be within [0-9]
def set_channel_name(self, channel, channel_name):
# select channel for modification
self.query('M' + channel)
# send first part of command for changing channel name
# send second part of command for changing channel name + channel name + CR
self.query('$' + channel_name + '\r')
# call method for requisting channel name from ALMEMO 710
read_buffer = self.get_channel_name(channel)
# compare requested channel name an channel name from parameter list
# if non equal print error message
if read_buffer.strip() != channel_name:
print('setting of channel name has failed')
# method to get channel name from selected channel at ALMEMO 710
# channel numbers <= 9: string, format 'x.x' where x has to be within [0-9]
# channel numbers > 9: string, format 'x.xx' where x has to be within [0-9]
def get_channel_name(self, channel):
# select channel for request of channel name
self.query('M' + channel)
# send command for requesting channel name to ALMEMO 710
read_buffer = self.query('P00')
# search channel name in reply to cammand P00
# name follows after 2$\ in read_buffer
# get name out of read buffer
channel_name ='(2\$\\[^;]+)', read_buffer).group(1)
# strip white space characters
return channel_name.strip(r'(2\$\\)')
# method to request the list of all active measurement channels of all sensors
# that are connected to ALMEMO 710
# several parameters (channel number, meas_port, channel_unit, channel_name) are
# collected during request an stored in the meas buffer data frame of python class
def request_sens_channel_list(self):
# send command for requesting the list of all active meas channels of all sensors
# that are connected to ALMEMO 710
read_buffer = self.query('P15')
# find channel entries in read buffer and add them to a list
# a channel entry starts with M\ and ends with CR
channel_entries = re.findall(r'(M\\.+\r)', read_buffer)
# to get fixed parameter (channel number, measurment name, measurment unit),
# of a channel entry out of the read buffer search patterns are needed to identify them
# pattern for sens_channel: M\x.x or M\x.xx where x within [0-9]
# pattern for channel_unit: 1$\xx where x can be every character
# pattern for channel_name: 2$\xx where x can be every character without semicolon
search_patterns = {'sens_channel': r'(M\\[0-9]+\.{1}[0-9]+)', \
'channel_unit': r'(1\$\\.{2})', 'channel_name': r'(2\$\\[^;]*)'}
# the search pattern contains identification string for the search in front of the
# measurment parameter of interest and has to be removed -> strip pattern
strip_patterns = {'sens_channel': r'(M\\)', 'channel_unit': r'(1\$\\)', \
'channel_name': r'(2\$\\)'}
# dictonary where parameter of a channel are temporary store before they are added
# to the measurement buffer of the class
channel_params = {'sens_channel': '', 'channel_unit': '', 'channel_name': '', 'sens_port': ''}
for element in channel_entries:
for key in search_patterns:
# for an channel entry in read buffer parameter is searched in the string,
# and the stip pattern is removed
# result of search and strip is stored in dictonary channel params
channel_params.update({key:[key] ,element).group(1)})
channel_params.update({key: channel_params[key].strip(strip_patterns[key])})
# if search of a parameter fails, default value is set
# default value: 'N/A'
# an error message is printed that contains relevant data
# loop will be continued
except AttributeError as attr_err:
channel_params.update({key: 'N/A'})
print('An error occured during parameter search.' \
'Parameter is set to default (N/A)\n')
# with the information of sens_channel the sensor port is determined
# sens_port = 'M' + numbers in front of the point of the measurment channel
# example: sens_channel = 0.1 -> sens_port = M0
sens_port, _ = channel_params['sens_channel'].split('.')
channel_params.update({'sens_port': 'M' + sens_port})
# add a row in measurement buffer an fill colums in measurement buffer with
# collected parameter
self.meas_buffer = self.meas_buffer.append(channel_params, ignore_index= True)
# method to request all measument values for all measurement channels of all sensors
# that are connected to ALMEMO 710
# if request fails meas_buffer of class is not updated
def request_meas_vals_all_channels(self, wait_time = 0.5):
# request measured value from all measurment channels of all connected sensor
# to ALMEMO 710
read_buffer = self.query('S1', wait_time)
# search for line in measurment buffer that starts with digit and ends with digit
meas_vals_string ='([0-9].+[0-9])',read_buffer).group(1)
# change decimal designator to point if not default
mod_meas_vals_string = meas_vals_string.replace(',','.')
# ALMEMO 710 returns date, time and the measured values for all
# sensor channels of all connceted sensors.
# parameters are separated by semicolon, splitted and stored in list format
meas_param_list = mod_meas_vals_string.split(';')
# extraction of date from meas_param_list
date = meas_param_list.pop(0)
# extraction of time from meas_param_list
time = meas_param_list.pop(0)
# check if extracted date and time have correct format
datetime.strptime(date, '%d.%m.%y')
datetime.strptime(time, '%H:%M:%S')
# store date and time of measurement in meas_buffer of class
self.meas_buffer.loc[:, 'meas_date'] = date
self.meas_buffer.loc[:, 'meas_time'] = time
# after exctraction of date and time meas_vals_list does only contain
# the measured mavules for all sensor channels
# convert the measured values now to float
meas_vals_list = np.array(meas_param_list, dtype = 'float')
# sore measured values for all sensor channels in measurment buffer of class
self.meas_buffer.loc[:, 'meas_val'] = meas_vals_list
# if no line with measured values is found and error message is
# printed that contains relevant data
except AttributeError as att_err:
print('An error occured during search of meas_vals_string.\n')
# if list of measured values is empty an error message is
# printed that contains relevant data
except IndexError as ind_err:
print('List of measured values is empty. Date/time could not' \
'be extracted.\n')
except ValueError as val_err:
# determine source code which has caused error
val_err_initiator = traceback.extract_tb(val_err.__traceback__, limit=None)[0].line
# check for date format error
if 'date' in val_err_initiator:
print('String does not contain correct format for date\n')
# check for time format error and print error message that time format is incorrect
elif 'time' in val_err_initiator:
print('String does not contain correct format for time\n')
# print error message that mesured values contain parameter that can not
# be converted to float
print('List with measured values contain parameter that could ' \
'not be converted to float\n')
# print error message with relevant error data
# method to set the ouput format for the measured data
# default option and the only one at ALMEMO 710 is
# output in table format separated by semicolon
# S1 for request replies the following format:
# date;time;meas value channel 1;meas vale channel 2; ...;meas value channel N
def set_meas_output_format(self, output_format = 'N2'):
# send command to set the output format the measured data
self.query (output_format)
# method to the the conversion rate (Measurements per Second)
def set_conversion_rate(self, rate = 10):
# create as dictonary with the possible conversion rates and the
# scan time in seconds for the conversion rate
scan_time_dict = dict({10: 0.9, 50: 0.18, 100: 0.09})
# send command for setting conversion rate to ALMEMO 710
# fetch scan time for selected conversion rate out of dictonary
scan_time = scan_time_dict.get(int(rate))
return scan_time
# method to fetch parameter for selected channel from measrement buffer of class
# selection possible by channel number of sensor channel or channel name
# if more than one row of channel buffer fits entry can be selected by optional parameter item
def fetch_channel_param_from_meas_buffer(self, pattern = '0.0', index = 0):
# filter rows of data frame that match to selection
# results will be a dataframe that contains all matches
sel_channel_data = self.meas_buffer[self.meas_buffer.isin([pattern]).any(axis=1)]
# if no entry for selection of sensor channel is found in meas buffer of class,
# sensor channel '0.0' will be selected as default parameter
if sel_channel_data.empty == True:
sel_channel_data = self.meas_buffer[self.meas_buffer.isin(['0.0']).any(axis=1)]
# info message is printed that no channel matches to selection
print('value is not in data_frame. Default value (channel 0.0) is returned')
# determine number of matches
num_matches = sel_channel_data.shape[0]
# check if selected entry of filtered measured buffer smaller than number of matches
# max index is always number of matches -1
if index < num_matches:
meas_channel_data = sel_channel_data.iloc[index,:]
# determine the maximum value for index
# max index = number of matches -1
max_item_val = num_matches - 1
# return default value which is the fisrt one in the filtered meas_buffer
# print error message that selected index is too high
meas_channel_data = sel_channel_data.iloc[0,:]
print('Value for index too large. Set Value to Default (index = 0)\n')
print('max index: %d' % max_item_val)
# data series with selected channel is returned containing all parameter of sensor channel
# if more entires match to selection data series that should be returned can be selected
# by optinal parameter item. By default item is set to 0
# second parameter that is returned by the method is the number of matches
# that fit to selection criteria
return meas_channel_data, num_matches
# close telenet connection
def close(self):
# destructor of class
def __del__(self):
\ No newline at end of file
# -*- coding: utf-8 -*-
Created on Mon Jan 2 16:55:57 2023
@author: michael pawelzik
# import module with python class for ahlborn
import almemo710
# variables for selection in used methods
sel_channel_no = '0.0'
sel_channel_name = 'T,t'
item = 0
# create object for python class of almemo710
almemo710_obj = almemo710.almemo710(ip='')
# request date from almemo 710
date = almemo710_obj.get_date()
# request time from almemo 710
time = almemo710_obj.get_time()
# set name of measurment channel 0.1 from almemo 710
# request name of measurement channel from almemo 710
channel_name = almemo710_obj.get_channel_name(sel_channel_no)
# set output format for measured data
# parameter should be N2 or left empty
# output format is ste to table
almemo710_obj. set_meas_output_format()
# request list of all sensors channels that are currently active
# store this data in data frame of class_object
# trigger single measurement an request the measured values for all measurment channels
# of almemo 710
# store measured values in data frame of python class
# get measuerement buffer of class object
meas_buffer = almemo710_obj.meas_buffer
# fetch parameter of selected measurement channel from meas_buffer of class object
# input of channel name or channel number is possible
# numbmer format: 'x.x' or x.xx when number > 10 where is has to be [0-9]
meas_channel_data, num_matches = almemo710_obj. \
fetch_channel_param_from_meas_buffer(pattern = sel_channel_name, index = item)
# get measurement date from selected measurement channel
meas_date = meas_channel_data.meas_date
# get measurement time from selected measurement channel
meas_time = meas_channel_data.meas_time
# get measurement value from selected measurement channel
meas_value = meas_channel_data.meas_val
# get sensor unit from selected measurement channel
channel_unit = meas_channel_data.channel_unit
# get channel name from selected measurement channel
channel_name = meas_channel_data.channel_name
# get channel number from selected measurement channel
sens_channel = meas_channel_data.sens_channel
# close telnet connection
