Newer
Older
from PyQt5.QtWidgets import QMainWindow, QApplication, QPushButton, QTextEdit
from PyQt5 import uic, QtCore, QtGui, QtWidgets
import sys
import process_sync
import multiprocessing as mp
def __init__(self, qt_app, *args, **kwargs):
super().__init__(*args, **kwargs)
uic.loadUi('climate-lab-main.ui', self)
self.startButton.clicked.connect(lambda: self.do_measurement())
self.stopButton.clicked.connect(lambda: self.stop_measurement())
self.tempSweepButton.clicked.connect(lambda: self.set_temp_sweep_labels())
self.humSweepButton.clicked.connect(lambda: self.set_hum_sweep_labels())
self.start_dir = os.getcwd()
def set_temp_sweep_labels(self):
self.parametersGroupBox.setEnabled(True)
self.startStopStepGroupBox.setTitle('Temperature [degC]')
self.fixedParameterLabel.setText('Humidity [% r.h.]')
def set_hum_sweep_labels(self):
self.parametersGroupBox.setEnabled(True)
self.startStopStepGroupBox.setTitle('Humidity [% r.h.]')
self.fixedParameterLabel.setText('Temperature [degC]')
def check_and_change_dir(self, data_folder):
if os.path.exists(data_folder):
if os.path.isdir(data_folder):
button_reply = QtWidgets.QMessageBox.question(self, 'Warning: \'' + data_folder + '\' already exists',
'Overwrite data in \'' + data_folder + '\'?',
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.No)
if not button_reply == QtWidgets.QMessageBox.Yes:
return False
else:
QtWidgets.QMessageBox.critical(self, 'Error', '\'' + data_folder + '\' exists but is not a directory!')
return False
else:
try:
os.makedirs(data_folder)
except:
QtWidgets.QMessageBox.critical(self, 'Error', 'Cannot create folder \'' + data_folder + '\'!')
return False
os.chdir(data_folder)
print('changed dir')
return True
def get_analysis_config(self, time_string):
analysis_config = {}
analysis_config['dut_name'] = self.dutName.text()
analysis_config['time_string'] = time_string
if self.analysisTypeCableButton.isChecked():
analysis_config['type'] = 'rf_cable'
analysis_config['extra_signal_names'] = ['RF_FREQUENCY']
analysis_config['normalise'] = [True, False]
analysis_config['cable_length'] = self.cableLengthSpinBox.value()
else:
analysis_config['type'] = 'default'
analysis_config['extra_signal_names'] = []
analysis_config['normalise'] = [self.removeOffsetsSignal0.isChecked(), self.removeOffsetsSignal1.isChecked()]
return analysis_config
def stop_measurement(self):
process_sync.stop_measurement.set()
self.runSettingsWidget.setEnabled(False)
self.startButton.setEnabled(False)
self.stopButton.setEnabled(True)
self.qt_app.processEvents();
# get parameters for measurement from GUI
os.chdir(self.start_dir)
time_string = time.strftime("%Y_%m_%d-%H_%M_%S")
if self.autoNameCheckbox.isChecked():
output_basename = time_string + "_results"
else:
output_basename = self.baseName.text()
with open('test_stand_parameter.json', 'r') as f:
config_data = json.load(f)
with open('ext_sensor_channels.json', 'r') as f2:
ext_sensor_channels = json.load(f2)
if not self.check_and_change_dir(config_data['data_folder'] + '/' + output_basename):
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
if self.tempSweepButton.isChecked():
measurement_type = 'temperature'
elif self.humSweepButton.isChecked():
measurement_type = 'humidity'
elif self.measurementFileButton.isChecked():
measurement_type = 'file'
ctx = mp.get_context('spawn')
error_queue = ctx.Queue(3)
p = ctx.Process(target=TestStandMainWindow.do_measurement_impl,
args=(measurement_type, output_basename, self.startParameter.value(), self.stopParameter.value(),
self.stepParameter.value(), self.fixedParameter.value(), self.soakingTime.value(),
self.stableReads.value(), self.get_analysis_config(time_string),
os.path.join(self.start_dir, self.measurementFile.text()), error_queue, config_data,
ext_sensor_channels, process_sync.stop_measurement))
p.start()
while p.is_alive():
time.sleep(0.1)
self.qt_app.processEvents();
p.join()
while not error_queue.empty():
QtWidgets.QMessageBox.critical(self, 'Error', error_queue.get())
self.runSettingsWidget.setEnabled(True)
self.startButton.setEnabled(True)
self.stopButton.setEnabled(False)
#the part that is started in a separate process
@staticmethod
def do_measurement_impl(measurement_type, output_basename, start_parameter, stop_parameter, step_parameter,
fixed_parameter, soaking_time, stable_reads, analysis_config, measurement_file, error_queue,
config_data, ext_sensor_channels, stop_measurement_event):
process_sync.stop_measurement = stop_measurement_event
meas = prototype.Measurements(config_data, output_basename,False, ext_sensor_channels)
if measurement_type == 'temperature':
temperatures = meas.perform_sweep(start_parameter, stop_parameter, step_parameter, fixed_parameter,
soaking_time, stable_reads,
temp_extensions = []
for t in temperatures:
temp_extensions.append(str(t) + 'deg_' + str(fixed_parameter) + 'rh')
analysis.plot_sweep(temperatures, [fixed_parameter] * len(temperatures), output_basename,
'temperature', meas.dut.get_measurement_set_names(),
meas.dut.get_dut_reference_signal_names(), analysis_config)
prototype.plot_output(output_basename, temp_extensions, True, config_data, ext_sensor_channels,
meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(),
output_basename + ': Temperature sweep ' + str(temperatures[0]) + '--' +
str(temperatures[-1]) + ' degC @ ' + str(fixed_parameter) + ' % r.h.')
elif measurement_type == 'humidity':
humidities = meas.perform_sweep(start_parameter, stop_parameter, step_parameter, fixed_parameter,
soaking_time, stable_reads, 'humidity')
hum_extensions = []
for h in humidities:
hum_extensions.append(str(fixed_parameter) + 'deg_' + str(h) + 'rh')
analysis.plot_sweep([fixed_parameter] * len(humidities), humidities, output_basename,
'humidity', meas.dut.get_measurement_set_names(),
meas.dut.get_dut_reference_signal_names(), analysis_config)
prototype.plot_output(output_basename, hum_extensions, True, config_data, ext_sensor_channels,
meas.dut.get_measurement_set_names(), meas.dut.get_dut_reference_signal_names(),
output_basename + ': Humidity sweep ' + str(humidities[0]) + '--' +
str(humidities[-1]) + ' % r.h. @ ' + str(fixed_parameter) + ' degC')
elif measurement_type == 'file':
n_measurements = meas.perform_measurements(measurement_file)
prototype.plot_output(output_basename, range(n_measurements), True, config_data,
ext_sensor_channels,
meas.dut.get_measurement_set_names(),
meas.dut.get_dut_reference_signal_names(),
except FileNotFoundError as e:
error_queue.put(str(e))
if meas is not None:
meas.chamber.close()
meas.ext_sensors.close()
if __name__ == '__main__':
app = QApplication(sys.argv)
mainWindow = TestStandMainWindow(app)
mainWindow.show()
app.exec_()