Skip to content
Snippets Groups Projects
MeasurementPlot.py 18.6 KiB
Newer Older
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time
import multiprocessing as mp
import queue
import sys
# Different exceptions can be thrown while plotting, depending on the backend.
# We catch them all locally and raise our own exception instead
class PlottingError(Exception):
    "Raised when plotting fails"
    pass

class MeasurementPlot:
    def __init__(self, reference_signal_names, title='', trace_subplot5 ='', legend_loc = 'upper left',\
        self.reference_signal_names = reference_signal_names

        # set python for opening an separate plot window when starting from anaconda
        if 'ipykernel' in sys.modules:
            from IPython import get_ipython
            get_ipython().run_line_magic('matplotlib', 'qt')
        
        # parameter for legend of subplots
        self.legend_loc = legend_loc
        self.legend_bbox_to_anchor = legend_bbox_to_anchor
        # Prepare for five subplots
        self.fig, self.ax1 = plt.subplots(5, figsize=(25, 20))
        self.fig.subplots_adjust(bottom= 0.1, right=0.8, hspace = 0.4)
        self.fig.suptitle("Measurement "+title, color="red")

        # First plot: signal0 and signal1
        self.path_collection_signal0 = self.ax1[0].scatter([], [], c='red', marker='<', label=reference_signal_names[0])
        self.path_collection_fit = self.ax1[0].scatter([], [], c='green', marker='.', label = ' ')
        self.signal1_axis = self.ax1[0].twinx()
        self.path_collection_signal1 = self.signal1_axis.scatter([], [], c='#3120E0', marker='4', label=reference_signal_names[1])
        self.equi_axis0 = self.ax1[0].twinx()
        self.equi_axis0.spines['right'].set_position(('outward', 75))
        self.path_collection_equi0 = self.equi_axis0.scatter([], [], c='black', marker=".", label='Equilibrium_Indicator')

        self.ax1[0].set_xlabel("TIMESTAMP")
        self.ax1[0].set_ylabel(reference_signal_names[0], color='red')
        self.signal1_axis.set_ylabel(reference_signal_names[1], color='#3120E0')
        self.equi_axis0.set_ylabel("INDICATOR VALUE", color='black')
        # fix range to 0..31 with some extra margin for plotting
        self.equi_axis0.set_ylim(-1, 32)

        self.ax1[0].grid(True, linestyle=":")
        all_path_collections = [self.path_collection_signal0, self.path_collection_signal1,
                                self.path_collection_equi0, self.path_collection_fit]
        labels = [pc.get_label() for pc in all_path_collections]
        self.signal0_legend = self.ax1[0].legend(all_path_collections, labels, loc=self.legend_loc,
                                               bbox_to_anchor=self.legend_bbox_to_anchor)
        self.annotation = ax[0].annotate('', xy=(0, 1),
                                         xycoords='axes fraction', xytext=(-0.16, 1),
                                         textcoords='axes fraction', fontsize='16',
                                         horizontalalignment='left', verticalalignment='bottom')
        # Second plot: Humidity and temperature of climate chamber requested from internal sensors of chamber
        self.path_collection_temp = self.ax1[1].scatter([], [], c='blue', marker='p', label="Chamber Temperature")
        self.humidity_axis = self.ax1[1].twinx()
        self.path_collection_hum = self.humidity_axis.scatter([], [], c='green', marker="*", label="Chamber Humidity")
        self.equi_axis1 = self.ax1[1].twinx()
        self.equi_axis1.spines['right'].set_position(('outward', 75))
        self.path_collection_equi1 = self.equi_axis1.scatter([], [], c='black', marker=".",
                                                             label="Equilibrium Indicator")

        self.ax1[1].set_xlabel("TIMESTAMP")
        self.ax1[1].set_ylabel("TEMPERATURE [°C] ", color='blue')
        self.humidity_axis.set_ylabel("HUMIDITY [%RH]", color='green')
        self.equi_axis1.set_ylabel("INDICATOR VALUE", color='black')
        self.equi_axis1.set_ylim(-1, 32)

        self.ax1[1].grid(True, linestyle=":")
        all_path_collections = [self.path_collection_temp, self.path_collection_hum, self.path_collection_equi1]
        labels = [pc.get_label() for pc in all_path_collections]
        self.ax1[1].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)

        # Third plot:  parameter of external sensors DUT temperature,  DUT humidity
        self.path_collection_temp_dut = self.ax1[2].scatter([], [], c='red', marker='p', label='DUT temperature')
Michael Pawelzik's avatar
Michael Pawelzik committed
        self.path_collection_temp_dut2 = self.ax1[2].scatter([],[], c= 'green', marker = '.', label = 'DUT2 temeprature')
        
        self.ext_sens_hum_axis = self.ax1[2].twinx()
        self.path_collection_hum_dut = self.ext_sens_hum_axis.scatter([], [], c='purple', marker='*',
                                                                      label='DUT humidity')
Michael Pawelzik's avatar
Michael Pawelzik committed
        self.path_collection_hum_dut2 = self.ext_sens_hum_axis.scatter([],[], c='grey', marker='4', 
                                                                       label ='DUT2 humidity')                                                              
        
        
        self.ax1[2].set_xlabel("TIMESTAMP")
        self.ax1[2].set_ylabel("TEMPERATURE [°C]", color='red')
        self.ext_sens_hum_axis.set_ylabel("HUMIDITY [%RH]", color = 'purple')

        self.ax1[2].grid(True, linestyle=":")
Michael Pawelzik's avatar
Michael Pawelzik committed
        all_path_collections = [self.path_collection_temp_dut, self.path_collection_hum_dut,
                                self.path_collection_temp_dut2, self.path_collection_hum_dut2]
        labels = [pc.get_label() for pc in all_path_collections]
        self.ax1[2].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
        # Forth plot: parameter of external sensors: room temperature,  room humidity , air pressure room
        self.path_collection_temp_room = self.ax1[3].scatter([], [], c='green', marker='*', label='room temperature')
        
        self.sec_ext_hum_sens_axis = self.ax1[3].twinx()
        self.path_collection_hum_room = self.sec_ext_hum_sens_axis.scatter([], [], c='orange', marker='>',
                                                                           label='room humidity')
        
        self.press_axis = self.ax1[3].twinx()
        self.press_axis.spines['right'].set_position(('outward', 60))
        self.path_collection_air_press_room = self.press_axis.scatter([], [], c='grey', marker='4',
                                                                      label='air pressure room')
        
        self.ax1[3].set_xlabel("TIMESTAMP")
        self.ax1[3].set_ylabel("TEMPERATURE [°C]", color='green')
        self.sec_ext_hum_sens_axis.set_ylabel("HUMIDITY [%RH]", color='orange')
        self.press_axis.set_ylabel("AIR PRESSURE [mb]", color='grey')
        self.ax1[3].grid(True, linestyle=":")
        all_path_collections = [self.path_collection_temp_room, self.path_collection_hum_room,
                                self.path_collection_air_press_room]
        labels = [pc.get_label() for pc in all_path_collections]
        self.ax1[3].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)
        # Fifth plot: parameter of external sensors: meas instruments temperature,  meas instruments humidity
        subplot_dict = self.config_fifth_subplot()
        self.path_collection_trace_1 = self.ax1[4].scatter([], [], c='black', marker='p',
                                                           label=subplot_dict.get('label_trace_1'))
        self.sec_plot_param_axis = self.ax1[4].twinx()
        self.path_collection_trace_2 = self.sec_plot_param_axis.scatter([], [], c='brown', marker="*",
                                                                        label=subplot_dict.get('label_trace_2'))
        self.ax1[4].set_xlabel("TIMESTAMP")
        self.ax1[4].set_ylabel(subplot_dict.get('y_axis'), color='black')
        self.sec_plot_param_axis.set_ylabel(subplot_dict.get('sec_y_axis'), color='brown')
        all_path_collections = [self.path_collection_trace_1, self.path_collection_trace_2]
            
        labels = [pc.get_label() for pc in all_path_collections]
        self.ax1[4].legend(all_path_collections, labels, loc=self.legend_loc, bbox_to_anchor=self.legend_bbox_to_anchor)

        plt.rcParams.update({'font.size': 16})
        ctx = mp.get_context('spawn')
        self.data_queue = ctx.Queue(10)

    def stop(self):
        self.data_queue.put((None, None, True))

    def draw_in_other_thread(self, data_frame, pdf_name=''):
        # A shallow copy is enough (although not strictly 100 % thread safe). Data is only appended and
        # the data members are not altered.
        frame_copy = data_frame.copy()
        try:
            self.data_queue.put_nowait((frame_copy, pdf_name, False))
        except queue.Full:
            pass

    @staticmethod
    def window_closed(event):
        print('Plot window closed. Stopping measurement.')
        process_sync.stop_measurement.set()

    def drawing_loop(self, stop_measurement_event):
        # turn interactive plotting (back) on (is deactivated by starting the process)
        plt.ion()
        # set the global stop_measurement event so it is known in the window_closed callback
        process_sync.stop_measurement = stop_measurement_event
        window_close_callback = self.fig.canvas.mpl_connect('close_event', MeasurementPlot.window_closed)

        while True:
            if self.data_queue.empty():
                time.sleep(0.1)
                if not stop_measurement_event.is_set():
                    #the canvas might have gone and set stop_measurement_event when closing
                    self.fig.canvas.flush_events()
                continue

            data_frame, pdf_name, stop_drawing = self.data_queue.get()
            if stop_drawing:
                self.fig.canvas.mpl_disconnect(window_close_callback)
                return
            try:
                if not stop_measurement_event.is_set():
                    self.draw_in_this_thread(data_frame, pdf_name='')
            except Exception as e:
                # Don't exit here. Always drain the data queue so the program can terminate correctly
                stop_measurement_event.set()

    def draw_in_this_thread(self, data_frame, pdf_name=''):
        timestamps = data_frame.TIMESTAMP
        minimum, maximum = self.get_extended_min_max(timestamps)
        self.ax1[0].set_xlim(minimum, maximum)
        self.ax1[1].set_xlim(minimum, maximum)
        self.ax1[2].set_xlim(minimum, maximum)
        self.ax1[3].set_xlim(minimum, maximum)
        self.ax1[4].set_xlim(minimum, maximum)
        # refresh data for signal0 in subplot for signal0 and signal1
        signal0 = data_frame[self.reference_signal_names[0]]
        minimum, maximum = self.get_extended_min_max(signal0)
        self.ax1[0].set_ylim(minimum, maximum)
        self.path_collection_signal0.set_offsets(np.c_[timestamps, signal0])
        self.path_collection_fit.set_offsets(np.c_[[], []])
        # refresh data for signal1 in subplot for signal0 and signal1
        signal1s = data_frame[self.reference_signal_names[1]]
        minimum, maximum = self.get_extended_min_max(signal1s)
        self.signal1_axis.set_ylim(minimum, maximum)
        self.path_collection_signal1.set_offsets(np.c_[timestamps, signal1s])
        # refresh data for chamber temperature in subplot for chamber temperature and humidity  
        temperatures = data_frame.READBACK_TEMPERATURE
        minimum, maximum = self.get_extended_min_max(temperatures)
        self.ax1[1].set_ylim(minimum, maximum)
        self.path_collection_temp.set_offsets(np.c_[timestamps, temperatures])

        # refresh data for chamber humidity in subplot for chamber temperature and humidity
        humidities = data_frame.READBACK_HUMIDITY
        minimum, maximum = self.get_extended_min_max(humidities)
        self.humidity_axis.set_ylim(minimum, maximum)
        self.path_collection_hum.set_offsets(np.c_[timestamps, humidities])
        # refresh temperatures for used external sensors in subplots
        temp_dut = data_frame.TEMP_DUT
        temp_room = data_frame.TEMP_ROOM
Michael Pawelzik's avatar
Michael Pawelzik committed
        temp_dut2 = data_frame.TEMP_DUT2
        minimum, maximum = self.get_extended_min_max(temp_dut)
        self.ax1[2].set_ylim(minimum, maximum)
        self.path_collection_temp_dut.set_offsets(np.c_[timestamps, temp_dut])
        minimum, maximum = self.get_extended_min_max(temp_room)
        self.ax1[3].set_ylim(minimum, maximum)
        self.path_collection_temp_room.set_offsets(np.c_[timestamps, temp_room])
        # refresh humidities external sensors in subplots for DUT humidity and room humidity
        hum_dut = data_frame.HUM_DUT
        hum_room = data_frame.HUM_ROOM
        minimum, maximum = self.get_extended_min_max(hum_dut)
        self.ext_sens_hum_axis.set_ylim(minimum, maximum)
        
        minimum, maximum = self.get_extended_min_max(hum_room)
        self.sec_ext_hum_sens_axis.set_ylim(minimum, maximum)
        self.path_collection_hum_dut.set_offsets(np.c_[timestamps, hum_dut])
        self.path_collection_hum_room.set_offsets(np.c_[timestamps, hum_room])
        # refresh air pressure of external sensor in subplot for air pressure room
        air_press_room = data_frame.AIR_PRESS_ROOM
        minimum, maximum = self.get_extended_min_max(air_press_room)
        self.press_axis.set_ylim(minimum, maximum)
        self.path_collection_air_press_room.set_offsets(np.c_[timestamps, air_press_room])
        
        # refresh temperature and humidity of external sensor in subplot for measurement
        # for instrument temperature and measurement instrument humidity
        val_trace_1, val_trace_2 = self.refresh_param_fifth_subplot(data_frame)
        minimum, maximum = self.get_extended_min_max(val_trace_1)
        self.ax1[4].set_ylim(minimum, maximum)

        minimum, maximum = self.get_extended_min_max(val_trace_2)               
        self.sec_plot_param_axis.set_ylim(minimum, maximum)
        self.path_collection_trace_1.set_offsets(np.c_[timestamps, val_trace_1])
        self.path_collection_trace_2.set_offsets(np.c_[timestamps, val_trace_2])
        self.path_collection_equi0.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])
        self.path_collection_equi1.set_offsets(np.c_[timestamps, data_frame.EQUILIBRIUM_INDICATOR])

        if not pdf_name == '':
            self.fig.savefig(pdf_name)

        plt.show()
        if plt.isinteractive():
            try:
                self.fig.canvas.draw()
                self.fig.canvas.flush_events()
            except Exception as e:
                raise PlottingError from e
    def config_fifth_subplot(self):
        # key names for config parameter fifth subplot
        keys = ["label_trace_1", "label_trace_2", "y_axis", "sec_y_axis"]
        
            # values for plotting evnironmental conditions in measurement instrument chamber
            values = ["temperature\nalmemo sensors\nmeas instr chamber",
                      "humidity\nalmemo sensors\nmeas instr chamber",
                      "TEMPERATURE [°C]", "HUMIDITY [%RH]"]
            # values for plotting environmental conditions in measurement instrument chamber
            values = ["temperature\nchamber sensors\nmeas instr chamber",
                      "humidity\nchamber sensors\nmeas instr chamber",
                      "TEMPERATURE [°C]", "HUMIDITY [%RH]"]
            # values for plotting heater activity in fifth subplot
            values = ["Temp Heater\nDUT chamber", "Humidity Heater\n DUT chamber",
                      "ACTIVITY [%]","ACTIVITY [%]"]
        # generate dictionary from selection
        config_subplot_dict = dict(zip(keys, values))
            
        return config_subplot_dict

    def refresh_param_fifth_subplot(self, data_frame):
            # chose sensor values for refreshing fifth subplot
            val_trace_1 =  data_frame.TEMP_MEAS_INSTR      
            val_trace_2 = data_frame.HUM_MEAS_INSTR
            
            
            val_trace_1 =  data_frame.READBACK_TEMP_MEAS_INSTR      
            val_trace_2 = data_frame.READBACK_HUM_MEAS_INSTR
        
        else:
            
            # chose heater values for refreshing fifth plot
            val_trace_1 = data_frame.TEMP_HEATER      
            val_trace_2 = data_frame.HUM_HEATER
    # add 5 % of the distance between min and max to the range
    @staticmethod
    def get_extended_min_max(array):
        distance = array.max() - array.min()
            distance = 1
        return array.min()-0.05*distance, array.max()+0.05*distance

# test procedure for measurement plot procedure
if __name__ == '__main__':
    # possible selections trace subplot 5
    # chamber_sens
    # logger_sens
    # heater_dut_chamber
    
    m = MeasurementPlot(['S21_PHASE', 'S21_MAGNITUDE'], trace_subplot5="chamber_sens")
    plt.ion()
    measurements = []

    #FIXME: The loop should run in a separate thread and use draw_in_other_thread
# generation of datapoints for plot
    for i in range(20):
        measurement = {
            'TIMESTAMP': i,
            'READBACK_TEMPERATURE': 25 - i,
            'READBACK_HUMIDITY': 10 + 0.1*i,
            'EQUILIBRIUM_INDICATOR': i % 4,
            'S21_PHASE': 20 - 2*i,
            'S21_MAGNITUDE': 0.3*i,
            'TEMP_DUT': i,
            'TEMP_ROOM': 25-i,
            'HUM_DUT': 40,
            'HUM_ROOM': 45,
            'AIR_PRESS_ROOM': 1000+10*i,
            'TEMP_MEAS_INSTR': 40-1.5*i,
            'HUM_MEAS_INSTR': 55,
            
            'READBACK_TEMP_MEAS_INSTR': 50-2*i,
            'READBACK_HUM_MEAS_INSTR': 39
        measurements.append(measurement)
        my_data_frame = pd.DataFrame(measurements)
        # plot of data frame with test data for actual step
        m.draw_in_this_thread(my_data_frame)
        # plot of step number
        print(str(i))
        time.sleep(0.3)

    print('I am done. ')
    plt.ioff()
    m.draw_in_this_thread(my_data_frame, 'the.pdf')