-
Martin Killenberg authoredMartin Killenberg authored
MeasurementPlot.py 17.20 KiB
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time
import multiprocessing as mp
import queue
import sys
import process_sync
# Different exceptions can be thrown while plotting, depending on the backend.
# We catch them all locally and raise our own exception instead
class PlottingError(Exception):
"Raised when plotting fails"
pass
class MeasurementPlot:
def __init__(self, reference_signal_names, title='', trace_subplot5 ='', legend_loc = 'upper left',\
legend_bbox_to_anchor = (1.09, 1)):
self.reference_signal_names = reference_signal_names
# set python for opening an separate plot window when starting from anaconda
if 'ipykernel' in sys.modules:
from IPython import get_ipython
get_ipython().run_line_magic('matplotlib', 'qt')
self.trace_subplot5 = trace_subplot5
# parameter for legend of subplots
self.legend_loc = legend_loc
self.legend_bbox_to_anchor = legend_bbox_to_anchor
# Prepare for five subplots
self.fig, self.ax1 = plt.subplots(5, figsize=(25, 20))
self.fig.subplots_adjust(bottom= 0.1, right=0.8, hspace = 0.4)
self.fig.suptitle("Measurement "+title, color="red")
# First plot: signal0 and signal1
self.path_collection_signal0 = self.ax1[0].scatter([], [], c='red', marker='<', label=reference_signal_names[0])
self.path_collection_fit = self.ax1[0].scatter([], [], c='green', marker='.', label = ' ')
self.signal1_axis = self.ax1[0].twinx()
self.path_collection_signal1 = self.signal1_axis.scatter([], [], c='#3120E0', marker='4', label=reference_signal_names[1])
self.equi_axis0 = self.ax1[0].twinx()
self.equi_axis0.spines['right'].set_position(('outward', 75))
self.path_collection_equi0 = self.equi_axis0.scatter([], [], c='black', marker=".", label='Equilibrium_Indicator')
self.ax1[0].set_xlabel("TIMESTAMP")
self.ax1[0].set_ylabel(reference_signal_names[0], color='red')
self.signal1_axis.set_ylabel(reference_signal_names[1], color='#3120E0')
self.equi_axis0.set_ylabel("INDICATOR VALUE", color='black')
# fix range to 0..31 with some extra margin for plotting
self.equi_axis0.set_ylim(-1, 32)
self.ax1[0].grid(True, linestyle=":")
all_path_collections = [self.path_collection_signal0, self.path_collection_signal1,
self.path_collection_equi0, self.path_collection_fit]
labels = [pc.get_label() for pc in all_path_collections]
self.signal0_legend = self.ax1[0].legend(all_path_collections, labels, loc=self.legend_loc,
bbox_to_anchor=self.legend_bbox_to_anchor)
ax = self.fig.axes
self.annotation = ax[0].annotate('', xy=(0, 1),
xycoords='axes fraction', xytext=(-0.16, 1),
textcoords='axes fraction', fontsize='16',
horizontalalignment='left', verticalalignment='bottom')
# Second plot: Humidity and temperature of climate chamber requested from internal sensors of chamber
self.path_collection_temp = self.ax1[1].scatter([], [], c='blue', marker='p', label="Chamber Temperature")
self.humidity_axis = self.ax1[1].twinx()
self.path_collection_hum = self.humidity_axis.scatter([], [], c='green', marker="*", label="Chamber Humidity")
self.equi_axis1 = self.ax1[1].twinx()
self.equi_axis1.spines['right'].set_position(('outward', 75))
self.path_collection_equi1 = self.equi_axis1.scatter([], [], c='black', marker=".",
label="Equilibrium Indicator")
self.ax1[1].set_xlabel("TIMESTAMP")
self.ax1[1].set_ylabel("TEMPERATURE [°C] ", color='blue')
self.humidity_axis.set_ylabel("HUMIDITY [%RH]", color='green')
self.equi_axis1.set_ylabel("INDICATOR VALUE", color='black')
self.equi_axis1.set_ylim(-1, 32)
self.ax1[1].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp, self.path_collection_hum, self.path_collection_equi1]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[1].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Third plot: parameter of external sensors DUT temperature, DUT humidity
self.path_collection_temp_dut = self.ax1[2].scatter([], [], c='red', marker='p', label='DUT temperature')
self.ext_sens_hum_axis = self.ax1[2].twinx()
self.path_collection_hum_dut = self.ext_sens_hum_axis.scatter([], [], c='purple', marker='*',
label='DUT humidity')
self.ax1[2].set_xlabel("TIMESTAMP")
self.ax1[2].set_ylabel("TEMPERATURE [°C]", color='red')
self.ext_sens_hum_axis.set_ylabel("HUMIDITY [%RH]", color = 'purple')
self.ax1[2].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp_dut, self.path_collection_hum_dut]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[2].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Forth plot: parameter of external sensors: room temperature, room humidity , air pressure room
self.path_collection_temp_room = self.ax1[3].scatter([], [], c='green', marker='*', label='room temperature')
self.sec_ext_hum_sens_axis = self.ax1[3].twinx()
self.path_collection_hum_room = self.sec_ext_hum_sens_axis.scatter([], [], c='orange', marker='>',
label='room humidity')
self.press_axis = self.ax1[3].twinx()
self.press_axis.spines['right'].set_position(('outward', 60))
self.path_collection_air_press_room = self.press_axis.scatter([], [], c='grey', marker='4',
label='air pressure room')
self.ax1[3].set_xlabel("TIMESTAMP")
self.ax1[3].set_ylabel("TEMPERATURE [°C]", color='green')
self.sec_ext_hum_sens_axis.set_ylabel("HUMIDITY [%RH]", color='orange')
self.press_axis.set_ylabel("AIR PRESSURE [mb]", color='grey')
self.ax1[3].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp_room, self.path_collection_hum_room,
self.path_collection_air_press_room]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[3].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Fifth plot: parameter of external sensors: meas instruments temperature, meas instruments humidity
subplot_dict = self.config_fifth_subplot()
self.path_collection_trace_1 = self.ax1[4].scatter([], [], c='black', marker='p',
label=subplot_dict.get('label_trace_1'))
self.sec_plot_param_axis = self.ax1[4].twinx()
self.path_collection_trace_2 = self.sec_plot_param_axis.scatter([], [], c='brown', marker="*",
label=subplot_dict.get('label_trace_2'))
self.ax1[4].set_xlabel("TIMESTAMP")
self.ax1[4].set_ylabel(subplot_dict.get('y_axis'), color='black')
self.sec_plot_param_axis.set_ylabel(subplot_dict.get('sec_y_axis'), color='brown')
self.ax1[4].grid(True, linestyle=":")
all_path_collections = [self.path_collection_trace_1, self.path_collection_trace_2]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[4].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
plt.rcParams.update({'font.size': 16})
ctx = mp.get_context('spawn')
self.data_queue = ctx.Queue(10)
def stop(self):
self.data_queue.put((None, None, True))
def draw_in_other_thread(self, data_frame, pdf_name=''):
# A shallow copy is enough (although not strictly 100 % thread safe). Data is only appended and
# the data members are not altered.
frame_copy = data_frame.copy()
try:
self.data_queue.put_nowait((frame_copy, pdf_name, False))
except queue.Full:
pass
def drawing_loop(self):
while True:
data_frame, pdf_name, stop_drawing = self.data_queue.get()
if stop_drawing:
return
try:
if not process_sync.stop_measurement.is_set():
self.draw_in_this_thread(data_frame, pdf_name='')
except Exception as e:
# Don't exit here. Always drain the data queue so the program can terminate correctly
process_sync.stop_measurement.set()
def draw_in_this_thread(self, data_frame, pdf_name=''):
timestamps = data_frame.TIMESTAMP
minimum, maximum = self.get_extended_min_max(timestamps)
self.ax1[0].set_xlim(minimum, maximum)
self.ax1[1].set_xlim(minimum, maximum)
self.ax1[2].set_xlim(minimum, maximum)
self.ax1[3].set_xlim(minimum, maximum)
self.ax1[4].set_xlim(minimum, maximum)
# refresh data for signal0 in subplot for signal0 and signal1
signal0 = data_frame[self.reference_signal_names[0]]
minimum, maximum = self.get_extended_min_max(signal0)
self.ax1[0].set_ylim(minimum, maximum)
self.path_collection_signal0.set_offsets(np.c_[timestamps, signal0])
self.path_collection_fit.set_offsets(np.c_[[], []])
# refresh data for signal1 in subplot for signal0 and signal1
signal1s = data_frame[self.reference_signal_names[1]]
minimum, maximum = self.get_extended_min_max(signal1s)
self.signal1_axis.set_ylim(minimum, maximum)
self.path_collection_signal1.set_offsets(np.c_[timestamps, signal1s])
# refresh data for chamber temperature in subplot for chamber temperature and humidity
temperatures = data_frame.READBACK_TEMPERATURE
minimum, maximum = self.get_extended_min_max(temperatures)
self.ax1[1].set_ylim(minimum, maximum)
self.path_collection_temp.set_offsets(np.c_[timestamps, temperatures])
# refresh data for chamber humidity in subplot for chamber temperature and humidity
humidities = data_frame.READBACK_HUMIDITY
minimum, maximum = self.get_extended_min_max(humidities)
self.humidity_axis.set_ylim(minimum, maximum)
self.path_collection_hum.set_offsets(np.c_[timestamps, humidities])
# refresh temperatures for used external sensors in subplots
temp_dut = data_frame.TEMP_DUT
temp_room = data_frame.TEMP_ROOM
minimum, maximum = self.get_extended_min_max(temp_dut)
self.ax1[2].set_ylim(minimum, maximum)
self.path_collection_temp_dut.set_offsets(np.c_[timestamps, temp_dut])
minimum, maximum = self.get_extended_min_max(temp_room)
self.ax1[3].set_ylim(minimum, maximum)
self.path_collection_temp_room.set_offsets(np.c_[timestamps, temp_room])
# refresh humidities external sensors in subplots for DUT humidity and room humidity
hum_dut = data_frame.HUM_DUT
hum_room = data_frame.HUM_ROOM
minimum, maximum = self.get_extended_min_max(hum_dut)
self.ext_sens_hum_axis.set_ylim(minimum, maximum)
minimum, maximum = self.get_extended_min_max(hum_room)
self.sec_ext_hum_sens_axis.set_ylim(minimum, maximum)
self.path_collection_hum_dut.set_offsets(np.c_[timestamps, hum_dut])
self.path_collection_hum_room.set_offsets(np.c_[timestamps, hum_room])
# refresh air pressure of external sensor in subplot for air pressure room
air_press_room = data_frame.AIR_PRESS_ROOM
minimum, maximum = self.get_extended_min_max(air_press_room)
self.press_axis.set_ylim(minimum, maximum)
self.path_collection_air_press_room.set_offsets(np.c_[timestamps, air_press_room])
# refresh temperature and humidity of external sensor in subplot for measurement
# for instrument temperature and measurement instrument humidity
val_trace_1, val_trace_2 = self.refresh_param_fifth_subplot(data_frame)
minimum, maximum = self.get_extended_min_max(val_trace_1)
self.ax1[4].set_ylim(minimum, maximum)
minimum, maximum = self.get_extended_min_max(val_trace_2)
self.sec_plot_param_axis.set_ylim(minimum, maximum)
self.path_collection_trace_1.set_offsets(np.c_[timestamps, val_trace_1])
self.path_collection_trace_2.set_offsets(np.c_[timestamps, val_trace_2])
self.path_collection_equi0.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])
self.path_collection_equi1.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])
if not pdf_name == '':
self.fig.savefig(pdf_name)
plt.show()
if plt.isinteractive():
try:
self.fig.canvas.draw()
self.fig.canvas.flush_events()
except Exception as e:
raise PlottingError from e
def config_fifth_subplot(self):
# key names for config parameter fifth subplot
keys = ["label_trace_1", "label_trace_2", "y_axis", "sec_y_axis"]
if self.trace_subplot5 == 'logger_sens':
# values for plotting evnironmental conditions in measurement instrument chamber
values = ["temperature\nalmemo sensors\nmeas instr chamber",
"humidity\nalmemo sensors\nmeas instr chamber",
"TEMPERATURE [°C]", "HUMIDITY [%RH]"]
elif self.trace_subplot5 == 'chamber_sens':
# values for plotting environmental conditions in measurement instrument chamber
values = ["temperature\nchamber sensors\nmeas instr chamber",
"humidity\nchamber sensors\nmeas instr chamber",
"TEMPERATURE [°C]", "HUMIDITY [%RH]"]
else:
# values for plotting heater activity in fifth subplot
values = ["Temp Heater\nDUT chamber", "Humidity Heater\n DUT chamber",
"ACTIVITY [%]","ACTIVITY [%]"]
# generate dictionary from selection
config_subplot_dict = dict(zip(keys, values))
return config_subplot_dict
def refresh_param_fifth_subplot(self, data_frame):
if self.trace_subplot5 == 'logger_sens':
# chose sensor values for refreshing fifth subplot
val_trace_1 = data_frame.TEMP_MEAS_INSTR
val_trace_2 = data_frame.HUM_MEAS_INSTR
elif self.trace_subplot5 == 'chamber_sens':
val_trace_1 = data_frame.READBACK_TEMP_MEAS_INSTR
val_trace_2 = data_frame.READBACK_HUM_MEAS_INSTR
else:
# chose heater values for refreshing fifth plot
val_trace_1 = data_frame.TEMP_HEATER
val_trace_2 = data_frame.HUM_HEATER
return val_trace_1, val_trace_2
# add 5 % of the distance between min and max to the range
@staticmethod
def get_extended_min_max(array):
distance = array.max() - array.min()
if distance == 0.:
distance = 1
return array.min()-0.05*distance, array.max()+0.05*distance
# test procedure for measurement plot procedure
if __name__ == '__main__':
# possible selections trace subplot 5
# chamber_sens
# logger_sens
# heater_dut_chamber
m = MeasurementPlot(['S21_PHASE', 'S21_MAGNITUDE'], trace_subplot5="chamber_sens")
plt.ion()
measurements = []
#FIXME: The loop should run in a separate thread and use draw_in_other_thread
# generation of datapoints for plot
for i in range(20):
measurement = {
'TIMESTAMP': i,
'READBACK_TEMPERATURE': 25 - i,
'READBACK_HUMIDITY': 10 + 0.1*i,
'EQUILIBRIUM_INDICATOR': i % 4,
'S21_PHASE': 20 - 2*i,
'S21_MAGNITUDE': 0.3*i,
'TEMP_HEATER': 10,
'HUM_HEATER': 3,
'TEMP_DUT': i,
'TEMP_ROOM': 25-i,
'HUM_DUT': 40,
'HUM_ROOM': 45,
'AIR_PRESS_ROOM': 1000+10*i,
'TEMP_MEAS_INSTR': 40-1.5*i,
'HUM_MEAS_INSTR': 55,
'READBACK_TEMP_MEAS_INSTR': 50-2*i,
'READBACK_HUM_MEAS_INSTR': 39
}
measurements.append(measurement)
my_data_frame = pd.DataFrame(measurements)
# plot of data frame with test data for actual step
m.draw_in_this_thread(my_data_frame)
# plot of step number
print(str(i))
time.sleep(0.3)
print('I am done. ')
plt.ioff()
m.draw_in_this_thread(my_data_frame, 'the.pdf')