Skip to content
Snippets Groups Projects
climate-lab-gui.py 9.04 KiB
Newer Older
Martin Killenberg's avatar
Martin Killenberg committed
from PyQt5.QtWidgets import QMainWindow, QApplication, QPushButton, QTextEdit
from PyQt5 import uic, QtCore, QtGui, QtWidgets
import sys
import prototype
import json
import analysis
import process_sync
import multiprocessing as mp
Martin Killenberg's avatar
Martin Killenberg committed

Martin Killenberg's avatar
Martin Killenberg committed
class TestStandMainWindow(QMainWindow):
    def __init__(self, qt_app, *args, **kwargs):
Martin Killenberg's avatar
Martin Killenberg committed
        super().__init__(*args, **kwargs)
        uic.loadUi('climate-lab-main.ui', self)
Martin Killenberg's avatar
Martin Killenberg committed

        self.startButton.clicked.connect(lambda: self.do_measurement())
        self.stopButton.clicked.connect(lambda: self.stop_measurement())
        self.tempSweepButton.clicked.connect(lambda: self.set_temp_sweep_labels())
        self.humSweepButton.clicked.connect(lambda: self.set_hum_sweep_labels())
        self.start_dir = os.getcwd()

    def set_temp_sweep_labels(self):
        self.parametersGroupBox.setEnabled(True)
        self.startStopStepGroupBox.setTitle('Temperature [degC]')
        self.fixedParameterLabel.setText('Humidity [% r.h.]')

    def set_hum_sweep_labels(self):
        self.parametersGroupBox.setEnabled(True)
        self.startStopStepGroupBox.setTitle('Humidity [% r.h.]')
        self.fixedParameterLabel.setText('Temperature [degC]')
    def check_and_change_dir(self, data_folder):
        if os.path.exists(data_folder):
            if os.path.isdir(data_folder):
                button_reply = QtWidgets.QMessageBox.question(self, 'Warning: \'' + data_folder + '\' already exists',
                                                              'Overwrite data in \'' + data_folder + '\'?',
                                                              QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
                                                              QtWidgets.QMessageBox.No)
                if not button_reply == QtWidgets.QMessageBox.Yes:
                    return False
            else:
                QtWidgets.QMessageBox.critical(self, 'Error', '\'' + data_folder + '\' exists but is not a directory!')
                return False
        else:
            try:
                os.makedirs(data_folder)
            except:
                QtWidgets.QMessageBox.critical(self, 'Error', 'Cannot create folder \'' + data_folder + '\'!')
                return False
        os.chdir(data_folder)
        print('changed dir')
        return True

    def get_analysis_config(self, time_string):
        analysis_config = {}
        analysis_config['dut_name'] = self.dutName.text()
        analysis_config['time_string'] = time_string
        if self.analysisTypeCableButton.isChecked():
            analysis_config['type'] = 'rf_cable'
            analysis_config['extra_signal_names'] = ['RF_FREQUENCY']
            analysis_config['normalise'] = [True, False]
            analysis_config['cable_length'] = self.cableLengthSpinBox.value()
        else:
            analysis_config['type'] = 'default'
            analysis_config['extra_signal_names'] = []
            analysis_config['normalise'] = [self.removeOffsetsSignal0.isChecked(), self.removeOffsetsSignal1.isChecked()]

        return analysis_config

    def stop_measurement(self):
        process_sync.stop_measurement.set()

    def do_measurement(self):
        self.runSettingsWidget.setEnabled(False)
        self.startButton.setEnabled(False)
        self.stopButton.setEnabled(True)
        self.qt_app.processEvents();

        # get parameters for measurement from GUI
        os.chdir(self.start_dir)
        time_string = time.strftime("%Y_%m_%d-%H_%M_%S")
        if self.autoNameCheckbox.isChecked():
            output_basename = time_string + "_results"
        else:
            output_basename = self.baseName.text()
        with open('test_stand_parameter.json', 'r') as f:
            config_data = json.load(f)

        with open('ext_sensor_channels.json', 'r') as f2:
            ext_sensor_channels = json.load(f2)

        if not self.check_and_change_dir(config_data['data_folder'] + '/' + output_basename):
        if self.tempSweepButton.isChecked():
            measurement_type = 'temperature'
        elif self.humSweepButton.isChecked():
            measurement_type = 'humidity'
        elif self.measurementFileButton.isChecked():
            measurement_type = 'file'

        ctx = mp.get_context('spawn')
        error_queue = ctx.Queue(3)

        p = ctx.Process(target=TestStandMainWindow.do_measurement_impl,
                        args=(measurement_type, output_basename, self.startParameter.value(), self.stopParameter.value(),
                              self.stepParameter.value(), self.fixedParameter.value(), self.soakingTime.value(),
                              self.stableReads.value(), self.get_analysis_config(time_string),
                              os.path.join(self.start_dir, self.measurementFile.text()), error_queue, config_data,
                              ext_sensor_channels, process_sync.stop_measurement))

        p.start()
        while p.is_alive():
            time.sleep(0.1)
            self.qt_app.processEvents();
        p.join()

        while not error_queue.empty():
            QtWidgets.QMessageBox.critical(self, 'Error', error_queue.get())

        self.runSettingsWidget.setEnabled(True)
        self.startButton.setEnabled(True)
        self.stopButton.setEnabled(False)


    #the part that is started in a separate process
    @staticmethod
    def do_measurement_impl(measurement_type, output_basename, start_parameter, stop_parameter, step_parameter,
                            fixed_parameter, soaking_time, stable_reads, analysis_config, measurement_file, error_queue,
                            config_data, ext_sensor_channels, stop_measurement_event):
        process_sync.stop_measurement = stop_measurement_event

            meas = prototype.Measurements(config_data, output_basename,False, ext_sensor_channels)
            if measurement_type == 'temperature':
                temperatures = meas.perform_sweep(start_parameter, stop_parameter, step_parameter, fixed_parameter,
                                                  soaking_time, stable_reads,
                                                  'temperature')
                temp_extensions = []
                for t in temperatures:
                    temp_extensions.append(str(t) + 'deg_' + str(fixed_parameter) + 'rh')
                analysis.plot_sweep(temperatures, [fixed_parameter] * len(temperatures), output_basename,
                                    'temperature', meas.dut.get_measurement_set_names(),
                                    meas.dut.get_dut_reference_signal_names(), analysis_config)

                prototype.plot_output(output_basename, temp_extensions, True, config_data, ext_sensor_channels,
                                      meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(),
                                      output_basename + ': Temperature sweep ' + str(temperatures[0]) + '--' +
                                      str(temperatures[-1]) + ' degC @ ' + str(fixed_parameter) + ' % r.h.')
            elif measurement_type == 'humidity':
                humidities = meas.perform_sweep(start_parameter, stop_parameter, step_parameter, fixed_parameter,
                                                soaking_time, stable_reads, 'humidity')
                hum_extensions = []
                for h in humidities:
                    hum_extensions.append(str(fixed_parameter) + 'deg_' + str(h) + 'rh')
                analysis.plot_sweep([fixed_parameter] * len(humidities), humidities, output_basename,
                                    'humidity', meas.dut.get_measurement_set_names(),
                                    meas.dut.get_dut_reference_signal_names(), analysis_config)

                prototype.plot_output(output_basename, hum_extensions, True, config_data, ext_sensor_channels,
                                      meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(),
                                      output_basename + ': Humidity sweep ' + str(humidities[0]) + '--' +
                                      str(humidities[-1]) + ' % r.h. @ ' + str(fixed_parameter) + ' degC')
            elif measurement_type == 'file':
                    n_measurements = meas.perform_measurements(measurement_file)
                    prototype.plot_output(output_basename, range(n_measurements), True, config_data,
                                          ext_sensor_channels,
                                          meas.dut.get_measurement_set_names(),
                                          meas.dut.get_dut_reference_signal_names(),
                                          output_basename)
                except FileNotFoundError as e:
            if meas is not None:
                meas.chamber.close()
                meas.ext_sensors.close()
Martin Killenberg's avatar
Martin Killenberg committed

if __name__ == '__main__':
    app = QApplication(sys.argv)
    mainWindow = TestStandMainWindow(app)
    mainWindow.show()
    app.exec_()