Newer
Older
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time
import multiprocessing as mp
# Different exceptions can be thrown while plotting, depending on the backend.
# We catch them all locally and raise our own exception instead
class PlottingError(Exception):
"Raised when plotting fails"
pass
def __init__(self, reference_signal_names, title='', trace_subplot5 ='', legend_loc = 'upper left',\
Michael Pawelzik
committed
legend_bbox_to_anchor = (1.09, 1)):
self.reference_signal_names = reference_signal_names
# set python for opening an separate plot window when starting from anaconda
if 'ipykernel' in sys.modules:
from IPython import get_ipython
get_ipython().run_line_magic('matplotlib', 'qt')
Michael Pawelzik
committed
self.trace_subplot5 = trace_subplot5
Michael Pawelzik
committed
Michael Pawelzik
committed
# parameter for legend of subplots
self.legend_loc = legend_loc
self.legend_bbox_to_anchor = legend_bbox_to_anchor
self.fig, self.ax1 = plt.subplots(5, figsize=(25, 20))
Michael Pawelzik
committed
self.fig.subplots_adjust(bottom= 0.1, right=0.8, hspace = 0.4)
self.fig.suptitle("Measurement "+title, color="red")
# First plot: signal0 and signal1
self.path_collection_signal0 = self.ax1[0].scatter([], [], c='red', marker='<', label=reference_signal_names[0])
self.path_collection_fit = self.ax1[0].scatter([], [], c='green', marker='.', label = ' ')
self.signal1_axis = self.ax1[0].twinx()
self.path_collection_signal1 = self.signal1_axis.scatter([], [], c='#3120E0', marker='4', label=reference_signal_names[1])
self.equi_axis0 = self.ax1[0].twinx()
self.equi_axis0.spines['right'].set_position(('outward', 75))
self.path_collection_equi0 = self.equi_axis0.scatter([], [], c='black', marker=".", label='Equilibrium_Indicator')
self.ax1[0].set_xlabel("TIMESTAMP")
self.ax1[0].set_ylabel(reference_signal_names[0], color='red')
self.signal1_axis.set_ylabel(reference_signal_names[1], color='#3120E0')
self.equi_axis0.set_ylabel("INDICATOR VALUE", color='black')
# fix range to 0..31 with some extra margin for plotting
self.equi_axis0.set_ylim(-1, 32)
self.ax1[0].grid(True, linestyle=":")
all_path_collections = [self.path_collection_signal0, self.path_collection_signal1,
self.path_collection_equi0, self.path_collection_fit]
labels = [pc.get_label() for pc in all_path_collections]
self.signal0_legend = self.ax1[0].legend(all_path_collections, labels, loc=self.legend_loc,
Michael Pawelzik
committed
ax = self.fig.axes
self.annotation = ax[0].annotate('', xy=(0, 1),
xycoords='axes fraction', xytext=(-0.16, 1),
textcoords='axes fraction', fontsize='16',
horizontalalignment='left', verticalalignment='bottom')
# Second plot: Humidity and temperature of climate chamber requested from internal sensors of chamber
self.path_collection_temp = self.ax1[1].scatter([], [], c='blue', marker='p', label="Chamber Temperature")
self.humidity_axis = self.ax1[1].twinx()
self.path_collection_hum = self.humidity_axis.scatter([], [], c='green', marker="*", label="Chamber Humidity")
self.equi_axis1 = self.ax1[1].twinx()
self.equi_axis1.spines['right'].set_position(('outward', 75))
self.path_collection_equi1 = self.equi_axis1.scatter([], [], c='black', marker=".",
label="Equilibrium Indicator")
self.ax1[1].set_xlabel("TIMESTAMP")
self.ax1[1].set_ylabel("TEMPERATURE [°C] ", color='blue')
self.humidity_axis.set_ylabel("HUMIDITY [%RH]", color='green')
self.equi_axis1.set_ylabel("INDICATOR VALUE", color='black')
self.equi_axis1.set_ylim(-1, 32)
self.ax1[1].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp, self.path_collection_hum, self.path_collection_equi1]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[1].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Third plot: parameter of external sensors DUT temperature, DUT humidity
self.path_collection_temp_dut = self.ax1[2].scatter([], [], c='red', marker='p', label='DUT temperature')
self.ext_sens_hum_axis = self.ax1[2].twinx()
self.path_collection_hum_dut = self.ext_sens_hum_axis.scatter([], [], c='purple', marker='*',
label='DUT humidity')
self.ax1[2].set_xlabel("TIMESTAMP")
self.ax1[2].set_ylabel("TEMPERATURE [°C]", color='red')
self.ext_sens_hum_axis.set_ylabel("HUMIDITY [%RH]", color = 'purple')
self.ax1[2].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp_dut, self.path_collection_hum_dut]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[2].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Forth plot: parameter of external sensors: room temperature, room humidity , air pressure room
self.path_collection_temp_room = self.ax1[3].scatter([], [], c='green', marker='*', label='room temperature')
self.sec_ext_hum_sens_axis = self.ax1[3].twinx()
self.path_collection_hum_room = self.sec_ext_hum_sens_axis.scatter([], [], c='orange', marker='>',
label='room humidity')
self.press_axis = self.ax1[3].twinx()
Michael Pawelzik
committed
self.press_axis.spines['right'].set_position(('outward', 60))
self.path_collection_air_press_room = self.press_axis.scatter([], [], c='grey', marker='4',
label='air pressure room')
self.ax1[3].set_xlabel("TIMESTAMP")
self.ax1[3].set_ylabel("TEMPERATURE [°C]", color='green')
self.sec_ext_hum_sens_axis.set_ylabel("HUMIDITY [%RH]", color='orange')
self.press_axis.set_ylabel("AIR PRESSURE [mb]", color='grey')
self.ax1[3].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp_room, self.path_collection_hum_room,
self.path_collection_air_press_room]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[3].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Fifth plot: parameter of external sensors: meas instruments temperature, meas instruments humidity
subplot_dict = self.config_fifth_subplot()
self.path_collection_trace_1 = self.ax1[4].scatter([], [], c='black', marker='p',
label=subplot_dict.get('label_trace_1'))
Michael Pawelzik
committed
self.sec_plot_param_axis = self.ax1[4].twinx()
self.path_collection_trace_2 = self.sec_plot_param_axis.scatter([], [], c='brown', marker="*",
label=subplot_dict.get('label_trace_2'))
Michael Pawelzik
committed
self.ax1[4].set_xlabel("TIMESTAMP")
self.ax1[4].set_ylabel(subplot_dict.get('y_axis'), color='black')
self.sec_plot_param_axis.set_ylabel(subplot_dict.get('sec_y_axis'), color='brown')
Michael Pawelzik
committed
self.ax1[4].grid(True, linestyle=":")
all_path_collections = [self.path_collection_trace_1, self.path_collection_trace_2]
Michael Pawelzik
committed
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[4].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
plt.rcParams.update({'font.size': 16})
ctx = mp.get_context('spawn')
self.data_queue = ctx.Queue(10)
def stop(self):
self.data_queue.put((None, None, True))
def draw_in_other_thread(self, data_frame, pdf_name=''):
# A shallow copy is enough (although not strictly 100 % thread safe). Data is only appended and
# the data members are not altered.
frame_copy = data_frame.copy()
try:
self.data_queue.put_nowait((frame_copy, pdf_name, False))
except queue.Full:
pass
@staticmethod
def window_closed(event):
print('Plot window closed. Stopping measurement.')
process_sync.stop_measurement.set()
def drawing_loop(self, stop_measurement_event):
# turn interactive plotting (back) on (is deactivated by starting the process)
plt.ion()
# set the global stop_measurement event so it is known in the window_closed callback
process_sync.stop_measurement = stop_measurement_event
window_close_callback = self.fig.canvas.mpl_connect('close_event', MeasurementPlot.window_closed)
Martin Killenberg
committed
if self.data_queue.empty():
time.sleep(0.1)
if not stop_measurement_event.is_set():
#the canvas might have gone and set stop_measurement_event when closing
self.fig.canvas.flush_events()
continue
data_frame, pdf_name, stop_drawing = self.data_queue.get()
if stop_drawing:
self.fig.canvas.mpl_disconnect(window_close_callback)
if not stop_measurement_event.is_set():
self.draw_in_this_thread(data_frame, pdf_name='')
except Exception as e:
# Don't exit here. Always drain the data queue so the program can terminate correctly
def draw_in_this_thread(self, data_frame, pdf_name=''):
timestamps = data_frame.TIMESTAMP
minimum, maximum = self.get_extended_min_max(timestamps)
self.ax1[0].set_xlim(minimum, maximum)
self.ax1[1].set_xlim(minimum, maximum)
self.ax1[2].set_xlim(minimum, maximum)
self.ax1[3].set_xlim(minimum, maximum)
self.ax1[4].set_xlim(minimum, maximum)
# refresh data for signal0 in subplot for signal0 and signal1
signal0 = data_frame[self.reference_signal_names[0]]
minimum, maximum = self.get_extended_min_max(signal0)
self.ax1[0].set_ylim(minimum, maximum)
self.path_collection_signal0.set_offsets(np.c_[timestamps, signal0])
self.path_collection_fit.set_offsets(np.c_[[], []])
# refresh data for signal1 in subplot for signal0 and signal1
signal1s = data_frame[self.reference_signal_names[1]]
minimum, maximum = self.get_extended_min_max(signal1s)
self.signal1_axis.set_ylim(minimum, maximum)
self.path_collection_signal1.set_offsets(np.c_[timestamps, signal1s])
# refresh data for chamber temperature in subplot for chamber temperature and humidity
temperatures = data_frame.READBACK_TEMPERATURE
minimum, maximum = self.get_extended_min_max(temperatures)
self.ax1[1].set_ylim(minimum, maximum)
self.path_collection_temp.set_offsets(np.c_[timestamps, temperatures])
# refresh data for chamber humidity in subplot for chamber temperature and humidity
humidities = data_frame.READBACK_HUMIDITY
minimum, maximum = self.get_extended_min_max(humidities)
self.humidity_axis.set_ylim(minimum, maximum)
self.path_collection_hum.set_offsets(np.c_[timestamps, humidities])
# refresh temperatures for used external sensors in subplots
temp_dut = data_frame.TEMP_DUT
temp_room = data_frame.TEMP_ROOM
minimum, maximum = self.get_extended_min_max(temp_dut)
self.ax1[2].set_ylim(minimum, maximum)
self.path_collection_temp_dut.set_offsets(np.c_[timestamps, temp_dut])
minimum, maximum = self.get_extended_min_max(temp_room)
self.ax1[3].set_ylim(minimum, maximum)
self.path_collection_temp_room.set_offsets(np.c_[timestamps, temp_room])
Michael Pawelzik
committed
# refresh humidities external sensors in subplots for DUT humidity and room humidity
hum_dut = data_frame.HUM_DUT
hum_room = data_frame.HUM_ROOM
minimum, maximum = self.get_extended_min_max(hum_dut)
self.ext_sens_hum_axis.set_ylim(minimum, maximum)
minimum, maximum = self.get_extended_min_max(hum_room)
self.sec_ext_hum_sens_axis.set_ylim(minimum, maximum)
self.path_collection_hum_dut.set_offsets(np.c_[timestamps, hum_dut])
self.path_collection_hum_room.set_offsets(np.c_[timestamps, hum_room])
# refresh air pressure of external sensor in subplot for air pressure room
air_press_room = data_frame.AIR_PRESS_ROOM
minimum, maximum = self.get_extended_min_max(air_press_room)
self.press_axis.set_ylim(minimum, maximum)
self.path_collection_air_press_room.set_offsets(np.c_[timestamps, air_press_room])
# refresh temperature and humidity of external sensor in subplot for measurement
# for instrument temperature and measurement instrument humidity
val_trace_1, val_trace_2 = self.refresh_param_fifth_subplot(data_frame)
Michael Pawelzik
committed
minimum, maximum = self.get_extended_min_max(val_trace_1)
self.ax1[4].set_ylim(minimum, maximum)
minimum, maximum = self.get_extended_min_max(val_trace_2)
self.sec_plot_param_axis.set_ylim(minimum, maximum)
self.path_collection_trace_1.set_offsets(np.c_[timestamps, val_trace_1])
self.path_collection_trace_2.set_offsets(np.c_[timestamps, val_trace_2])
self.path_collection_equi0.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])
self.path_collection_equi1.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])
if not pdf_name == '':
self.fig.savefig(pdf_name)
plt.show()
if plt.isinteractive():
try:
self.fig.canvas.draw()
self.fig.canvas.flush_events()
except Exception as e:
raise PlottingError from e
Michael Pawelzik
committed
Michael Pawelzik
committed
Michael Pawelzik
committed
keys = ["label_trace_1", "label_trace_2", "y_axis", "sec_y_axis"]
Michael Pawelzik
committed
if self.trace_subplot5 == 'logger_sens':
Michael Pawelzik
committed
# values for plotting evnironmental conditions in measurement instrument chamber
values = ["temperature\nalmemo sensors\nmeas instr chamber",
"humidity\nalmemo sensors\nmeas instr chamber",
"TEMPERATURE [°C]", "HUMIDITY [%RH]"]
Michael Pawelzik
committed
Michael Pawelzik
committed
elif self.trace_subplot5 == 'chamber_sens':
# values for plotting environmental conditions in measurement instrument chamber
values = ["temperature\nchamber sensors\nmeas instr chamber",
"humidity\nchamber sensors\nmeas instr chamber",
"TEMPERATURE [°C]", "HUMIDITY [%RH]"]
Michael Pawelzik
committed
else:
# values for plotting heater activity in fifth subplot
values = ["Temp Heater\nDUT chamber", "Humidity Heater\n DUT chamber",
"ACTIVITY [%]","ACTIVITY [%]"]
Michael Pawelzik
committed
config_subplot_dict = dict(zip(keys, values))
return config_subplot_dict
def refresh_param_fifth_subplot(self, data_frame):
Michael Pawelzik
committed
Michael Pawelzik
committed
if self.trace_subplot5 == 'logger_sens':
Michael Pawelzik
committed
# chose sensor values for refreshing fifth subplot
Michael Pawelzik
committed
val_trace_1 = data_frame.TEMP_MEAS_INSTR
val_trace_2 = data_frame.HUM_MEAS_INSTR
Michael Pawelzik
committed
elif self.trace_subplot5 == 'chamber_sens':
Michael Pawelzik
committed
val_trace_1 = data_frame.READBACK_TEMP_MEAS_INSTR
val_trace_2 = data_frame.READBACK_HUM_MEAS_INSTR
else:
Michael Pawelzik
committed
val_trace_1 = data_frame.TEMP_HEATER
val_trace_2 = data_frame.HUM_HEATER
Michael Pawelzik
committed
return val_trace_1, val_trace_2
# add 5 % of the distance between min and max to the range
@staticmethod
def get_extended_min_max(array):
distance = array.max() - array.min()
if distance == 0.:
return array.min()-0.05*distance, array.max()+0.05*distance
# test procedure for measurement plot procedure
if __name__ == '__main__':
Michael Pawelzik
committed
# possible selections trace subplot 5
# chamber_sens
# logger_sens
# heater_dut_chamber
m = MeasurementPlot(['S21_PHASE', 'S21_MAGNITUDE'], trace_subplot5="chamber_sens")
plt.ion()
measurements = []
#FIXME: The loop should run in a separate thread and use draw_in_other_thread
for i in range(20):
measurement = {
'TIMESTAMP': i,
'READBACK_TEMPERATURE': 25 - i,
'READBACK_HUMIDITY': 10 + 0.1*i,
'EQUILIBRIUM_INDICATOR': i % 4,
'S21_PHASE': 20 - 2*i,
Michael Pawelzik
committed
'TEMP_HEATER': 10,
'HUM_HEATER': 3,
'TEMP_DUT': i,
'TEMP_ROOM': 25-i,
'HUM_DUT': 40,
'HUM_ROOM': 45,
'AIR_PRESS_ROOM': 1000+10*i,
'TEMP_MEAS_INSTR': 40-1.5*i,
Michael Pawelzik
committed
'HUM_MEAS_INSTR': 55,
'READBACK_TEMP_MEAS_INSTR': 50-2*i,
'READBACK_HUM_MEAS_INSTR': 39
measurements.append(measurement)
my_data_frame = pd.DataFrame(measurements)
# plot of data frame with test data for actual step
m.draw_in_this_thread(my_data_frame)
print(str(i))
time.sleep(0.3)
print('I am done. ')
plt.ioff()
m.draw_in_this_thread(my_data_frame, 'the.pdf')