#!/usr/bin/python3 import os.path from PyQt5.QtWidgets import QMainWindow, QApplication, QPushButton, QTextEdit from PyQt5 import uic, QtCore, QtGui, QtWidgets import sys import prototype import json import time import analysis import process_sync import multiprocessing as mp class TestStandMainWindow(QMainWindow): def __init__(self, qt_app, *args, **kwargs): super().__init__(*args, **kwargs) uic.loadUi('climate-lab-main.ui', self) self.qt_app = qt_app self.startButton.clicked.connect(lambda: self.do_measurement()) self.stopButton.clicked.connect(lambda: self.stop_measurement()) self.tempSweepButton.clicked.connect(lambda: self.set_temp_sweep_labels()) self.humSweepButton.clicked.connect(lambda: self.set_hum_sweep_labels()) self.start_dir = os.getcwd() def set_temp_sweep_labels(self): self.parametersGroupBox.setEnabled(True) self.startStopStepGroupBox.setTitle('Temperature [degC]') self.fixedParameterLabel.setText('Humidity [% r.h.]') def set_hum_sweep_labels(self): self.parametersGroupBox.setEnabled(True) self.startStopStepGroupBox.setTitle('Humidity [% r.h.]') self.fixedParameterLabel.setText('Temperature [degC]') def check_and_change_dir(self, data_folder): if os.path.exists(data_folder): if os.path.isdir(data_folder): button_reply = QtWidgets.QMessageBox.question(self, 'Warning: \'' + data_folder + '\' already exists', 'Overwrite data in \'' + data_folder + '\'?', QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No) if not button_reply == QtWidgets.QMessageBox.Yes: return False else: QtWidgets.QMessageBox.critical(self, 'Error', '\'' + data_folder + '\' exists but is not a directory!') return False else: try: os.makedirs(data_folder) except: QtWidgets.QMessageBox.critical(self, 'Error', 'Cannot create folder \'' + data_folder + '\'!') return False os.chdir(data_folder) print('changed dir') return True def get_analysis_config(self, time_string): analysis_config = {} analysis_config['dut_name'] = self.dutName.text() analysis_config['time_string'] = time_string if self.analysisTypeCableButton.isChecked(): analysis_config['type'] = 'rf_cable' analysis_config['extra_signal_names'] = ['RF_FREQUENCY'] analysis_config['normalise'] = [True, False] analysis_config['cable_length'] = self.cableLengthSpinBox.value() else: analysis_config['type'] = 'default' analysis_config['extra_signal_names'] = [] analysis_config['normalise'] = [self.removeOffsetsSignal0.isChecked(), self.removeOffsetsSignal1.isChecked()] return analysis_config def stop_measurement(self): process_sync.stop_measurement.set() def do_measurement(self): self.runSettingsWidget.setEnabled(False) self.startButton.setEnabled(False) self.stopButton.setEnabled(True) self.qt_app.processEvents(); # get parameters for measurement from GUI os.chdir(self.start_dir) time_string = time.strftime("%Y_%m_%d-%H_%M_%S") if self.autoNameCheckbox.isChecked(): output_basename = time_string + "_results" else: output_basename = self.baseName.text() with open('test_stand_parameter.json', 'r') as f: config_data = json.load(f) with open('ext_sensor_channels.json', 'r') as f2: ext_sensor_channels = json.load(f2) if not self.check_and_change_dir(config_data['data_folder'] + '/' + output_basename): return if self.tempSweepButton.isChecked(): measurement_type = 'temperature' elif self.humSweepButton.isChecked(): measurement_type = 'humidity' elif self.measurementFileButton.isChecked(): measurement_type = 'file' ctx = mp.get_context('spawn') error_queue = ctx.Queue(3) p = ctx.Process(target=TestStandMainWindow.do_measurement_impl, args=(measurement_type, output_basename, self.startParameter.value(), self.stopParameter.value(), self.stepParameter.value(), self.fixedParameter.value(), self.soakingTime.value(), self.stableReads.value(), self.get_analysis_config(time_string), os.path.join(self.start_dir, self.measurementFile.text()), error_queue, config_data, ext_sensor_channels, process_sync.stop_measurement)) p.start() while p.is_alive(): time.sleep(0.1) self.qt_app.processEvents(); p.join() while not error_queue.empty(): QtWidgets.QMessageBox.critical(self, 'Error', error_queue.get()) self.runSettingsWidget.setEnabled(True) self.startButton.setEnabled(True) self.stopButton.setEnabled(False) #the part that is started in a separate process @staticmethod def do_measurement_impl(measurement_type, output_basename, start_parameter, stop_parameter, step_parameter, fixed_parameter, soaking_time, stable_reads, analysis_config, measurement_file, error_queue, config_data, ext_sensor_channels, stop_measurement_event): process_sync.stop_measurement = stop_measurement_event meas = None try: meas = prototype.Measurements(config_data, output_basename,False, ext_sensor_channels) if measurement_type == 'temperature': temperatures = meas.perform_sweep(start_parameter, stop_parameter, step_parameter, fixed_parameter, soaking_time, stable_reads, 'temperature') temp_extensions = [] for t in temperatures: temp_extensions.append(str(t) + 'deg_' + str(fixed_parameter) + 'rh') analysis.plot_sweep(temperatures, [fixed_parameter] * len(temperatures), output_basename, 'temperature', meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(), analysis_config) prototype.plot_output(output_basename, temp_extensions, True, config_data, ext_sensor_channels, meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(), output_basename + ': Temperature sweep ' + str(temperatures[0]) + '--' + str(temperatures[-1]) + ' degC @ ' + str(fixed_parameter) + ' % r.h.') elif measurement_type == 'humidity': humidities = meas.perform_sweep(start_parameter, stop_parameter, step_parameter, fixed_parameter, soaking_time, stable_reads, 'humidity') hum_extensions = [] for h in humidities: hum_extensions.append(str(fixed_parameter) + 'deg_' + str(h) + 'rh') analysis.plot_sweep([fixed_parameter] * len(humidities), humidities, output_basename, 'humidity', meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(), analysis_config) prototype.plot_output(output_basename, hum_extensions, True, config_data, ext_sensor_channels, meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(), output_basename + ': Humidity sweep ' + str(humidities[0]) + '--' + str(humidities[-1]) + ' % r.h. @ ' + str(fixed_parameter) + ' degC') elif measurement_type == 'file': try: n_measurements = meas.perform_measurements(measurement_file) prototype.plot_output(output_basename, range(n_measurements), True, config_data, ext_sensor_channels, meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(), output_basename) except FileNotFoundError as e: error_queue.put(str(e)) finally: if meas is not None: meas.chamber.close() meas.ext_sensors.close() if __name__ == '__main__': app = QApplication(sys.argv) mainWindow = TestStandMainWindow(app) mainWindow.show() app.exec_()