import pandas as pd import matplotlib.pyplot as plt import numpy as np import time import multiprocessing as mp import queue import sys import process_sync # Different exceptions can be thrown while plotting, depending on the backend. # We catch them all locally and raise our own exception instead class PlottingError(Exception): "Raised when plotting fails" pass class MeasurementPlot: def __init__(self, reference_signal_names, title='', trace_subplot5 ='', legend_loc = 'upper left',\ legend_bbox_to_anchor = (1.09, 1)): self.reference_signal_names = reference_signal_names # set python for opening an separate plot window when starting from anaconda if 'ipykernel' in sys.modules: from IPython import get_ipython get_ipython().run_line_magic('matplotlib', 'qt') self.trace_subplot5 = trace_subplot5 # parameter for legend of subplots self.legend_loc = legend_loc self.legend_bbox_to_anchor = legend_bbox_to_anchor # Prepare for five subplots self.fig, self.ax1 = plt.subplots(5, figsize=(25, 20)) self.fig.subplots_adjust(bottom= 0.1, right=0.8, hspace = 0.4) self.fig.suptitle("Measurement "+title, color="red") # First plot: signal0 and signal1 self.path_collection_signal0 = self.ax1[0].scatter([], [], c='red', marker='<', label=reference_signal_names[0]) self.path_collection_fit = self.ax1[0].scatter([], [], c='green', marker='.', label = ' ') self.signal1_axis = self.ax1[0].twinx() self.path_collection_signal1 = self.signal1_axis.scatter([], [], c='#3120E0', marker='4', label=reference_signal_names[1]) self.equi_axis0 = self.ax1[0].twinx() self.equi_axis0.spines['right'].set_position(('outward', 75)) self.path_collection_equi0 = self.equi_axis0.scatter([], [], c='black', marker=".", label='Equilibrium_Indicator') self.ax1[0].set_xlabel("TIMESTAMP") self.ax1[0].set_ylabel(reference_signal_names[0], color='red') self.signal1_axis.set_ylabel(reference_signal_names[1], color='#3120E0') self.equi_axis0.set_ylabel("INDICATOR VALUE", color='black') # fix range to 0..31 with some extra margin for plotting self.equi_axis0.set_ylim(-1, 32) self.ax1[0].grid(True, linestyle=":") all_path_collections = [self.path_collection_signal0, self.path_collection_signal1, self.path_collection_equi0, self.path_collection_fit] labels = [pc.get_label() for pc in all_path_collections] self.signal0_legend = self.ax1[0].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor) ax = self.fig.axes self.annotation = ax[0].annotate('', xy=(0, 1), xycoords='axes fraction', xytext=(-0.16, 1), textcoords='axes fraction', fontsize='16', horizontalalignment='left', verticalalignment='bottom') # Second plot: Humidity and temperature of climate chamber requested from internal sensors of chamber self.path_collection_temp = self.ax1[1].scatter([], [], c='blue', marker='p', label="Chamber Temperature") self.humidity_axis = self.ax1[1].twinx() self.path_collection_hum = self.humidity_axis.scatter([], [], c='green', marker="*", label="Chamber Humidity") self.equi_axis1 = self.ax1[1].twinx() self.equi_axis1.spines['right'].set_position(('outward', 75)) self.path_collection_equi1 = self.equi_axis1.scatter([], [], c='black', marker=".", label="Equilibrium Indicator") self.ax1[1].set_xlabel("TIMESTAMP") self.ax1[1].set_ylabel("TEMPERATURE [°C] ", color='blue') self.humidity_axis.set_ylabel("HUMIDITY [%RH]", color='green') self.equi_axis1.set_ylabel("INDICATOR VALUE", color='black') self.equi_axis1.set_ylim(-1, 32) self.ax1[1].grid(True, linestyle=":") all_path_collections = [self.path_collection_temp, self.path_collection_hum, self.path_collection_equi1] labels = [pc.get_label() for pc in all_path_collections] self.ax1[1].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor) # Third plot: parameter of external sensors DUT temperature, DUT humidity self.path_collection_temp_dut = self.ax1[2].scatter([], [], c='red', marker='p', label='DUT temperature') self.ext_sens_hum_axis = self.ax1[2].twinx() self.path_collection_hum_dut = self.ext_sens_hum_axis.scatter([], [], c='purple', marker='*', label='DUT humidity') self.ax1[2].set_xlabel("TIMESTAMP") self.ax1[2].set_ylabel("TEMPERATURE [°C]", color='red') self.ext_sens_hum_axis.set_ylabel("HUMIDITY [%RH]", color = 'purple') self.ax1[2].grid(True, linestyle=":") all_path_collections = [self.path_collection_temp_dut, self.path_collection_hum_dut] labels = [pc.get_label() for pc in all_path_collections] self.ax1[2].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor) # Forth plot: parameter of external sensors: room temperature, room humidity , air pressure room self.path_collection_temp_room = self.ax1[3].scatter([], [], c='green', marker='*', label='room temperature') self.sec_ext_hum_sens_axis = self.ax1[3].twinx() self.path_collection_hum_room = self.sec_ext_hum_sens_axis.scatter([], [], c='orange', marker='>', label='room humidity') self.press_axis = self.ax1[3].twinx() self.press_axis.spines['right'].set_position(('outward', 60)) self.path_collection_air_press_room = self.press_axis.scatter([], [], c='grey', marker='4', label='air pressure room') self.ax1[3].set_xlabel("TIMESTAMP") self.ax1[3].set_ylabel("TEMPERATURE [°C]", color='green') self.sec_ext_hum_sens_axis.set_ylabel("HUMIDITY [%RH]", color='orange') self.press_axis.set_ylabel("AIR PRESSURE [mb]", color='grey') self.ax1[3].grid(True, linestyle=":") all_path_collections = [self.path_collection_temp_room, self.path_collection_hum_room, self.path_collection_air_press_room] labels = [pc.get_label() for pc in all_path_collections] self.ax1[3].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor) # Fifth plot: parameter of external sensors: meas instruments temperature, meas instruments humidity subplot_dict = self.config_fifth_subplot() self.path_collection_trace_1 = self.ax1[4].scatter([], [], c='black', marker='p', label=subplot_dict.get('label_trace_1')) self.sec_plot_param_axis = self.ax1[4].twinx() self.path_collection_trace_2 = self.sec_plot_param_axis.scatter([], [], c='brown', marker="*", label=subplot_dict.get('label_trace_2')) self.ax1[4].set_xlabel("TIMESTAMP") self.ax1[4].set_ylabel(subplot_dict.get('y_axis'), color='black') self.sec_plot_param_axis.set_ylabel(subplot_dict.get('sec_y_axis'), color='brown') self.ax1[4].grid(True, linestyle=":") all_path_collections = [self.path_collection_trace_1, self.path_collection_trace_2] labels = [pc.get_label() for pc in all_path_collections] self.ax1[4].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor) plt.rcParams.update({'font.size': 16}) ctx = mp.get_context('spawn') self.data_queue = ctx.Queue(10) def stop(self): self.data_queue.put((None, None, True)) def draw_in_other_thread(self, data_frame, pdf_name=''): # A shallow copy is enough (although not strictly 100 % thread safe). Data is only appended and # the data members are not altered. frame_copy = data_frame.copy() try: self.data_queue.put_nowait((frame_copy, pdf_name, False)) except queue.Full: pass @staticmethod def window_closed(event): print('Plot window closed. Stopping measurement.') process_sync.stop_measurement.set() def drawing_loop(self, stop_measurement_event): # turn interactive plotting (back) on (is deactivated by starting the process) plt.ion() # set the global stop_measurement event so it is known in the window_closed callback process_sync.stop_measurement = stop_measurement_event window_close_callback = self.fig.canvas.mpl_connect('close_event', MeasurementPlot.window_closed) while True: if self.data_queue.empty(): time.sleep(0.1) if not stop_measurement_event.is_set(): #the canvas might have gone and set stop_measurement_event when closing self.fig.canvas.flush_events() continue data_frame, pdf_name, stop_drawing = self.data_queue.get() if stop_drawing: self.fig.canvas.mpl_disconnect(window_close_callback) return try: if not stop_measurement_event.is_set(): self.draw_in_this_thread(data_frame, pdf_name='') except Exception as e: # Don't exit here. Always drain the data queue so the program can terminate correctly stop_measurement_event.set() def draw_in_this_thread(self, data_frame, pdf_name=''): timestamps = data_frame.TIMESTAMP minimum, maximum = self.get_extended_min_max(timestamps) self.ax1[0].set_xlim(minimum, maximum) self.ax1[1].set_xlim(minimum, maximum) self.ax1[2].set_xlim(minimum, maximum) self.ax1[3].set_xlim(minimum, maximum) self.ax1[4].set_xlim(minimum, maximum) # refresh data for signal0 in subplot for signal0 and signal1 signal0 = data_frame[self.reference_signal_names[0]] minimum, maximum = self.get_extended_min_max(signal0) self.ax1[0].set_ylim(minimum, maximum) self.path_collection_signal0.set_offsets(np.c_[timestamps, signal0]) self.path_collection_fit.set_offsets(np.c_[[], []]) # refresh data for signal1 in subplot for signal0 and signal1 signal1s = data_frame[self.reference_signal_names[1]] minimum, maximum = self.get_extended_min_max(signal1s) self.signal1_axis.set_ylim(minimum, maximum) self.path_collection_signal1.set_offsets(np.c_[timestamps, signal1s]) # refresh data for chamber temperature in subplot for chamber temperature and humidity temperatures = data_frame.READBACK_TEMPERATURE minimum, maximum = self.get_extended_min_max(temperatures) self.ax1[1].set_ylim(minimum, maximum) self.path_collection_temp.set_offsets(np.c_[timestamps, temperatures]) # refresh data for chamber humidity in subplot for chamber temperature and humidity humidities = data_frame.READBACK_HUMIDITY minimum, maximum = self.get_extended_min_max(humidities) self.humidity_axis.set_ylim(minimum, maximum) self.path_collection_hum.set_offsets(np.c_[timestamps, humidities]) # refresh temperatures for used external sensors in subplots temp_dut = data_frame.TEMP_DUT temp_room = data_frame.TEMP_ROOM minimum, maximum = self.get_extended_min_max(temp_dut) self.ax1[2].set_ylim(minimum, maximum) self.path_collection_temp_dut.set_offsets(np.c_[timestamps, temp_dut]) minimum, maximum = self.get_extended_min_max(temp_room) self.ax1[3].set_ylim(minimum, maximum) self.path_collection_temp_room.set_offsets(np.c_[timestamps, temp_room]) # refresh humidities external sensors in subplots for DUT humidity and room humidity hum_dut = data_frame.HUM_DUT hum_room = data_frame.HUM_ROOM minimum, maximum = self.get_extended_min_max(hum_dut) self.ext_sens_hum_axis.set_ylim(minimum, maximum) minimum, maximum = self.get_extended_min_max(hum_room) self.sec_ext_hum_sens_axis.set_ylim(minimum, maximum) self.path_collection_hum_dut.set_offsets(np.c_[timestamps, hum_dut]) self.path_collection_hum_room.set_offsets(np.c_[timestamps, hum_room]) # refresh air pressure of external sensor in subplot for air pressure room air_press_room = data_frame.AIR_PRESS_ROOM minimum, maximum = self.get_extended_min_max(air_press_room) self.press_axis.set_ylim(minimum, maximum) self.path_collection_air_press_room.set_offsets(np.c_[timestamps, air_press_room]) # refresh temperature and humidity of external sensor in subplot for measurement # for instrument temperature and measurement instrument humidity val_trace_1, val_trace_2 = self.refresh_param_fifth_subplot(data_frame) minimum, maximum = self.get_extended_min_max(val_trace_1) self.ax1[4].set_ylim(minimum, maximum) minimum, maximum = self.get_extended_min_max(val_trace_2) self.sec_plot_param_axis.set_ylim(minimum, maximum) self.path_collection_trace_1.set_offsets(np.c_[timestamps, val_trace_1]) self.path_collection_trace_2.set_offsets(np.c_[timestamps, val_trace_2]) self.path_collection_equi0.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR]) self.path_collection_equi1.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR]) if not pdf_name == '': self.fig.savefig(pdf_name) plt.show() if plt.isinteractive(): try: self.fig.canvas.draw() self.fig.canvas.flush_events() except Exception as e: raise PlottingError from e def config_fifth_subplot(self): # key names for config parameter fifth subplot keys = ["label_trace_1", "label_trace_2", "y_axis", "sec_y_axis"] if self.trace_subplot5 == 'logger_sens': # values for plotting evnironmental conditions in measurement instrument chamber values = ["temperature\nalmemo sensors\nmeas instr chamber", "humidity\nalmemo sensors\nmeas instr chamber", "TEMPERATURE [°C]", "HUMIDITY [%RH]"] elif self.trace_subplot5 == 'chamber_sens': # values for plotting environmental conditions in measurement instrument chamber values = ["temperature\nchamber sensors\nmeas instr chamber", "humidity\nchamber sensors\nmeas instr chamber", "TEMPERATURE [°C]", "HUMIDITY [%RH]"] else: # values for plotting heater activity in fifth subplot values = ["Temp Heater\nDUT chamber", "Humidity Heater\n DUT chamber", "ACTIVITY [%]","ACTIVITY [%]"] # generate dictionary from selection config_subplot_dict = dict(zip(keys, values)) return config_subplot_dict def refresh_param_fifth_subplot(self, data_frame): if self.trace_subplot5 == 'logger_sens': # chose sensor values for refreshing fifth subplot val_trace_1 = data_frame.TEMP_MEAS_INSTR val_trace_2 = data_frame.HUM_MEAS_INSTR elif self.trace_subplot5 == 'chamber_sens': val_trace_1 = data_frame.READBACK_TEMP_MEAS_INSTR val_trace_2 = data_frame.READBACK_HUM_MEAS_INSTR else: # chose heater values for refreshing fifth plot val_trace_1 = data_frame.TEMP_HEATER val_trace_2 = data_frame.HUM_HEATER return val_trace_1, val_trace_2 # add 5 % of the distance between min and max to the range @staticmethod def get_extended_min_max(array): distance = array.max() - array.min() if distance == 0.: distance = 1 return array.min()-0.05*distance, array.max()+0.05*distance # test procedure for measurement plot procedure if __name__ == '__main__': # possible selections trace subplot 5 # chamber_sens # logger_sens # heater_dut_chamber m = MeasurementPlot(['S21_PHASE', 'S21_MAGNITUDE'], trace_subplot5="chamber_sens") plt.ion() measurements = [] #FIXME: The loop should run in a separate thread and use draw_in_other_thread # generation of datapoints for plot for i in range(20): measurement = { 'TIMESTAMP': i, 'READBACK_TEMPERATURE': 25 - i, 'READBACK_HUMIDITY': 10 + 0.1*i, 'EQUILIBRIUM_INDICATOR': i % 4, 'S21_PHASE': 20 - 2*i, 'S21_MAGNITUDE': 0.3*i, 'TEMP_HEATER': 10, 'HUM_HEATER': 3, 'TEMP_DUT': i, 'TEMP_ROOM': 25-i, 'HUM_DUT': 40, 'HUM_ROOM': 45, 'AIR_PRESS_ROOM': 1000+10*i, 'TEMP_MEAS_INSTR': 40-1.5*i, 'HUM_MEAS_INSTR': 55, 'READBACK_TEMP_MEAS_INSTR': 50-2*i, 'READBACK_HUM_MEAS_INSTR': 39 } measurements.append(measurement) my_data_frame = pd.DataFrame(measurements) # plot of data frame with test data for actual step m.draw_in_this_thread(my_data_frame) # plot of step number print(str(i)) time.sleep(0.3) print('I am done. ') plt.ioff() m.draw_in_this_thread(my_data_frame, 'the.pdf')