Newer
Older
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time
import multiprocessing as mp
import queue
import process_sync
# Different exceptions can be thrown while plotting, depending on the backend.
# We catch them all locally and raise our own exception instead
class PlottingError(Exception):
"Raised when plotting fails"
pass
def __init__(self, reference_signal_names, title='', trace_subplot5 ='', legend_loc = 'upper left',\
Michael Pawelzik
committed
legend_bbox_to_anchor = (1.09, 1)):
self.reference_signal_names = reference_signal_names
# set python for opening an separate plot window when starting from anaconda
if 'ipykernel' in sys.modules:
from IPython import get_ipython
get_ipython().run_line_magic('matplotlib', 'qt')
Michael Pawelzik
committed
self.trace_subplot5 = trace_subplot5
Michael Pawelzik
committed
Michael Pawelzik
committed
# parameter for legend of subplots
self.legend_loc = legend_loc
self.legend_bbox_to_anchor = legend_bbox_to_anchor
self.fig, self.ax1 = plt.subplots(5, figsize=(25, 20))
Michael Pawelzik
committed
self.fig.subplots_adjust(bottom= 0.1, right=0.8, hspace = 0.4)
self.fig.suptitle("Measurement "+title, color="red")
# First plot: signal0 and signal1
self.path_collection_signal0 = self.ax1[0].scatter([], [], c='red', marker='<', label=reference_signal_names[0])
self.path_collection_fit = self.ax1[0].scatter([], [], c='green', marker='.', label = ' ')
self.signal1_axis = self.ax1[0].twinx()
self.path_collection_signal1 = self.signal1_axis.scatter([], [], c='#3120E0', marker='4', label=reference_signal_names[1])
self.equi_axis0 = self.ax1[0].twinx()
self.equi_axis0.spines['right'].set_position(('outward', 75))
self.path_collection_equi0 = self.equi_axis0.scatter([], [], c='black', marker=".", label='Equilibrium_Indicator')
self.ax1[0].set_xlabel("TIMESTAMP")
self.ax1[0].set_ylabel(reference_signal_names[0], color='red')
self.signal1_axis.set_ylabel(reference_signal_names[1], color='#3120E0')
self.equi_axis0.set_ylabel("INDICATOR VALUE", color='black')
# fix range to 0..31 with some extra margin for plotting
self.equi_axis0.set_ylim(-1, 32)
self.ax1[0].grid(True, linestyle=":")
all_path_collections = [self.path_collection_signal0, self.path_collection_signal1,
self.path_collection_equi0, self.path_collection_fit]
labels = [pc.get_label() for pc in all_path_collections]
self.signal0_legend = self.ax1[0].legend(all_path_collections, labels, loc=self.legend_loc,
Michael Pawelzik
committed
ax = self.fig.axes
self.annotation = ax[0].annotate('', xy=(0, 1),
xycoords='axes fraction', xytext=(-0.16, 1),
textcoords='axes fraction', fontsize='16',
horizontalalignment='left', verticalalignment='bottom')
# Second plot: Humidity and temperature of climate chamber requested from internal sensors of chamber
self.path_collection_temp = self.ax1[1].scatter([], [], c='blue', marker='p', label="Chamber Temperature")
self.humidity_axis = self.ax1[1].twinx()
self.path_collection_hum = self.humidity_axis.scatter([], [], c='green', marker="*", label="Chamber Humidity")
self.equi_axis1 = self.ax1[1].twinx()
self.equi_axis1.spines['right'].set_position(('outward', 75))
self.path_collection_equi1 = self.equi_axis1.scatter([], [], c='black', marker=".",
label="Equilibrium Indicator")
self.ax1[1].set_xlabel("TIMESTAMP")
self.ax1[1].set_ylabel("TEMPERATURE [°C] ", color='blue')
self.humidity_axis.set_ylabel("HUMIDITY [%RH]", color='green')
self.equi_axis1.set_ylabel("INDICATOR VALUE", color='black')
self.equi_axis1.set_ylim(-1, 32)
self.ax1[1].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp, self.path_collection_hum, self.path_collection_equi1]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[1].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Third plot: parameter of external sensors DUT temperature, DUT humidity
self.path_collection_temp_dut = self.ax1[2].scatter([], [], c='red', marker='p', label='DUT temperature')
self.path_collection_temp_dut2 = self.ax1[2].scatter([],[], c= 'green', marker = '.', label = 'DUT2 temeprature')
self.ext_sens_hum_axis = self.ax1[2].twinx()
self.path_collection_hum_dut = self.ext_sens_hum_axis.scatter([], [], c='purple', marker='*',
label='DUT humidity')
self.path_collection_hum_dut2 = self.ext_sens_hum_axis.scatter([],[], c='grey', marker='4',
label ='DUT2 humidity')
self.ax1[2].set_xlabel("TIMESTAMP")
self.ax1[2].set_ylabel("TEMPERATURE [°C]", color='red')
self.ext_sens_hum_axis.set_ylabel("HUMIDITY [%RH]", color = 'purple')
self.ax1[2].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp_dut, self.path_collection_hum_dut,
self.path_collection_temp_dut2, self.path_collection_hum_dut2]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[2].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Forth plot: parameter of external sensors: room temperature, room humidity , air pressure room
self.path_collection_temp_room = self.ax1[3].scatter([], [], c='green', marker='*', label='room temperature')
self.sec_ext_hum_sens_axis = self.ax1[3].twinx()
self.path_collection_hum_room = self.sec_ext_hum_sens_axis.scatter([], [], c='orange', marker='>',
label='room humidity')
self.press_axis = self.ax1[3].twinx()
Michael Pawelzik
committed
self.press_axis.spines['right'].set_position(('outward', 60))
self.path_collection_air_press_room = self.press_axis.scatter([], [], c='grey', marker='4',
label='air pressure room')
self.ax1[3].set_xlabel("TIMESTAMP")
self.ax1[3].set_ylabel("TEMPERATURE [°C]", color='green')
self.sec_ext_hum_sens_axis.set_ylabel("HUMIDITY [%RH]", color='orange')
self.press_axis.set_ylabel("AIR PRESSURE [mb]", color='grey')
self.ax1[3].grid(True, linestyle=":")
all_path_collections = [self.path_collection_temp_room, self.path_collection_hum_room,
self.path_collection_air_press_room]
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[3].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
# Fifth plot: parameter of external sensors: meas instruments temperature, meas instruments humidity
subplot_dict = self.config_fifth_subplot()
self.path_collection_trace_1 = self.ax1[4].scatter([], [], c='black', marker='p',
label=subplot_dict.get('label_trace_1'))
Michael Pawelzik
committed
self.sec_plot_param_axis = self.ax1[4].twinx()
self.path_collection_trace_2 = self.sec_plot_param_axis.scatter([], [], c='brown', marker="*",
label=subplot_dict.get('label_trace_2'))
Michael Pawelzik
committed
self.ax1[4].set_xlabel("TIMESTAMP")
self.ax1[4].set_ylabel(subplot_dict.get('y_axis'), color='black')
self.sec_plot_param_axis.set_ylabel(subplot_dict.get('sec_y_axis'), color='brown')
Michael Pawelzik
committed
self.ax1[4].grid(True, linestyle=":")
all_path_collections = [self.path_collection_trace_1, self.path_collection_trace_2]
Michael Pawelzik
committed
labels = [pc.get_label() for pc in all_path_collections]
self.ax1[4].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
plt.rcParams.update({'font.size': 16})
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
ctx = mp.get_context('spawn')
self.data_queue = ctx.Queue(10)
def stop(self):
self.data_queue.put((None, None, True))
def draw_in_other_thread(self, data_frame, pdf_name=''):
# A shallow copy is enough (although not strictly 100 % thread safe). Data is only appended and
# the data members are not altered.
frame_copy = data_frame.copy()
try:
self.data_queue.put_nowait((frame_copy, pdf_name, False))
except queue.Full:
pass
@staticmethod
def window_closed(event):
print('Plot window closed. Stopping measurement.')
process_sync.stop_measurement.set()
def drawing_loop(self, stop_measurement_event):
# turn interactive plotting (back) on (is deactivated by starting the process)
plt.ion()
# set the global stop_measurement event so it is known in the window_closed callback
process_sync.stop_measurement = stop_measurement_event
window_close_callback = self.fig.canvas.mpl_connect('close_event', MeasurementPlot.window_closed)
while True:
if self.data_queue.empty():
time.sleep(0.1)
if not stop_measurement_event.is_set():
#the canvas might have gone and set stop_measurement_event when closing
self.fig.canvas.flush_events()
continue
data_frame, pdf_name, stop_drawing = self.data_queue.get()
if stop_drawing:
self.fig.canvas.mpl_disconnect(window_close_callback)
return
try:
if not stop_measurement_event.is_set():
self.draw_in_this_thread(data_frame, pdf_name='')
except Exception as e:
# Don't exit here. Always drain the data queue so the program can terminate correctly
stop_measurement_event.set()
def draw_in_this_thread(self, data_frame, pdf_name=''):
timestamps = data_frame.TIMESTAMP
minimum, maximum = self.get_extended_min_max(timestamps)
self.ax1[0].set_xlim(minimum, maximum)
self.ax1[1].set_xlim(minimum, maximum)
self.ax1[2].set_xlim(minimum, maximum)
self.ax1[3].set_xlim(minimum, maximum)
self.ax1[4].set_xlim(minimum, maximum)
# refresh data for signal0 in subplot for signal0 and signal1
signal0 = data_frame[self.reference_signal_names[0]]
minimum, maximum = self.get_extended_min_max(signal0)
self.ax1[0].set_ylim(minimum, maximum)
self.path_collection_signal0.set_offsets(np.c_[timestamps, signal0])
self.path_collection_fit.set_offsets(np.c_[[], []])
# refresh data for signal1 in subplot for signal0 and signal1
signal1s = data_frame[self.reference_signal_names[1]]
minimum, maximum = self.get_extended_min_max(signal1s)
self.signal1_axis.set_ylim(minimum, maximum)
self.path_collection_signal1.set_offsets(np.c_[timestamps, signal1s])
# refresh data for chamber temperature in subplot for chamber temperature and humidity
temperatures = data_frame.READBACK_TEMPERATURE
minimum, maximum = self.get_extended_min_max(temperatures)
self.ax1[1].set_ylim(minimum, maximum)
self.path_collection_temp.set_offsets(np.c_[timestamps, temperatures])
# refresh data for chamber humidity in subplot for chamber temperature and humidity
humidities = data_frame.READBACK_HUMIDITY
minimum, maximum = self.get_extended_min_max(humidities)
self.humidity_axis.set_ylim(minimum, maximum)
self.path_collection_hum.set_offsets(np.c_[timestamps, humidities])
# refresh temperatures for used external sensors in subplots
temp_dut = data_frame.TEMP_DUT
temp_room = data_frame.TEMP_ROOM
minimum, maximum = self.get_extended_min_max(temp_dut)
self.ax1[2].set_ylim(minimum, maximum)
self.path_collection_temp_dut.set_offsets(np.c_[timestamps, temp_dut])
minimum, maximum = self.get_extended_min_max(temp_room)
self.ax1[3].set_ylim(minimum, maximum)
self.path_collection_temp_room.set_offsets(np.c_[timestamps, temp_room])
Michael Pawelzik
committed
# refresh humidities external sensors in subplots for DUT humidity and room humidity
hum_dut = data_frame.HUM_DUT
hum_room = data_frame.HUM_ROOM
minimum, maximum = self.get_extended_min_max(hum_dut)
self.ext_sens_hum_axis.set_ylim(minimum, maximum)
minimum, maximum = self.get_extended_min_max(hum_room)
self.sec_ext_hum_sens_axis.set_ylim(minimum, maximum)
self.path_collection_hum_dut.set_offsets(np.c_[timestamps, hum_dut])
self.path_collection_hum_room.set_offsets(np.c_[timestamps, hum_room])
# refresh air pressure of external sensor in subplot for air pressure room
air_press_room = data_frame.AIR_PRESS_ROOM
minimum, maximum = self.get_extended_min_max(air_press_room)
self.press_axis.set_ylim(minimum, maximum)
self.path_collection_air_press_room.set_offsets(np.c_[timestamps, air_press_room])
# refresh temperature and humidity of external sensor in subplot for measurement
# for instrument temperature and measurement instrument humidity
val_trace_1, val_trace_2 = self.refresh_param_fifth_subplot(data_frame)
Michael Pawelzik
committed
minimum, maximum = self.get_extended_min_max(val_trace_1)
self.ax1[4].set_ylim(minimum, maximum)
minimum, maximum = self.get_extended_min_max(val_trace_2)
self.sec_plot_param_axis.set_ylim(minimum, maximum)
self.path_collection_trace_1.set_offsets(np.c_[timestamps, val_trace_1])
self.path_collection_trace_2.set_offsets(np.c_[timestamps, val_trace_2])
self.path_collection_equi0.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])
self.path_collection_equi1.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])
if not pdf_name == '':
self.fig.savefig(pdf_name)
plt.show()
if plt.isinteractive():
try:
self.fig.canvas.draw()
self.fig.canvas.flush_events()
except Exception as e:
raise PlottingError from e
Michael Pawelzik
committed
Michael Pawelzik
committed
Michael Pawelzik
committed
keys = ["label_trace_1", "label_trace_2", "y_axis", "sec_y_axis"]
Michael Pawelzik
committed
if self.trace_subplot5 == 'logger_sens':
Michael Pawelzik
committed
# values for plotting evnironmental conditions in measurement instrument chamber
values = ["temperature\nalmemo sensors\nmeas instr chamber",
"humidity\nalmemo sensors\nmeas instr chamber",
"TEMPERATURE [°C]", "HUMIDITY [%RH]"]
Michael Pawelzik
committed
Michael Pawelzik
committed
elif self.trace_subplot5 == 'chamber_sens':
# values for plotting environmental conditions in measurement instrument chamber
values = ["temperature\nchamber sensors\nmeas instr chamber",
"humidity\nchamber sensors\nmeas instr chamber",
"TEMPERATURE [°C]", "HUMIDITY [%RH]"]
Michael Pawelzik
committed
else:
# values for plotting heater activity in fifth subplot
values = ["Temp Heater\nDUT chamber", "Humidity Heater\n DUT chamber",
"ACTIVITY [%]","ACTIVITY [%]"]
Michael Pawelzik
committed
config_subplot_dict = dict(zip(keys, values))
return config_subplot_dict
def refresh_param_fifth_subplot(self, data_frame):
Michael Pawelzik
committed
Michael Pawelzik
committed
if self.trace_subplot5 == 'logger_sens':
Michael Pawelzik
committed
# chose sensor values for refreshing fifth subplot
Michael Pawelzik
committed
val_trace_1 = data_frame.TEMP_MEAS_INSTR
val_trace_2 = data_frame.HUM_MEAS_INSTR
Michael Pawelzik
committed
elif self.trace_subplot5 == 'chamber_sens':
Michael Pawelzik
committed
val_trace_1 = data_frame.READBACK_TEMP_MEAS_INSTR
val_trace_2 = data_frame.READBACK_HUM_MEAS_INSTR
else:
Michael Pawelzik
committed
val_trace_1 = data_frame.TEMP_HEATER
val_trace_2 = data_frame.HUM_HEATER
Michael Pawelzik
committed
return val_trace_1, val_trace_2
# add 5 % of the distance between min and max to the range
@staticmethod
def get_extended_min_max(array):
distance = array.max() - array.min()
if distance == 0.:
return array.min()-0.05*distance, array.max()+0.05*distance
# test procedure for measurement plot procedure
if __name__ == '__main__':
Michael Pawelzik
committed
# possible selections trace subplot 5
# chamber_sens
# logger_sens
# heater_dut_chamber
m = MeasurementPlot(['S21_PHASE', 'S21_MAGNITUDE'], trace_subplot5="chamber_sens")
plt.ion()
measurements = []
#FIXME: The loop should run in a separate thread and use draw_in_other_thread
for i in range(20):
measurement = {
'TIMESTAMP': i,
'READBACK_TEMPERATURE': 25 - i,
'READBACK_HUMIDITY': 10 + 0.1*i,
'EQUILIBRIUM_INDICATOR': i % 4,
'S21_PHASE': 20 - 2*i,
Michael Pawelzik
committed
'TEMP_HEATER': 10,
'HUM_HEATER': 3,
'TEMP_DUT': i,
'TEMP_ROOM': 25-i,
'HUM_DUT': 40,
'HUM_ROOM': 45,
'AIR_PRESS_ROOM': 1000+10*i,
'TEMP_MEAS_INSTR': 40-1.5*i,
Michael Pawelzik
committed
'HUM_MEAS_INSTR': 55,
'READBACK_TEMP_MEAS_INSTR': 50-2*i,
'READBACK_HUM_MEAS_INSTR': 39
measurements.append(measurement)
my_data_frame = pd.DataFrame(measurements)
# plot of data frame with test data for actual step
m.draw_in_this_thread(my_data_frame)
print(str(i))
time.sleep(0.3)
print('I am done. ')
plt.ioff()
m.draw_in_this_thread(my_data_frame, 'the.pdf')