[feat] add zero cali function

This commit is contained in:
zhji 2025-11-23 23:34:33 +08:00
parent 3cd0f9301d
commit ef401f2239

302
python/hfc/cali.py Normal file
View File

@ -0,0 +1,302 @@
import sys
import os
import time
from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QWidget, QMessageBox)
from PyQt5.QtCore import QTimer, QThread, pyqtSignal, Qt
from PyQt5.QtGui import QPixmap, QFont, QIcon
from pymodbus.client.sync import ModbusSerialClient as ModbusRtuClient
from pymodbus.exceptions import ModbusException
class ModbusWorker(QThread):
"""Modbus通信工作线程"""
concentration_updated = pyqtSignal(int) # 改为int类型
calibration_result = pyqtSignal(bool, str)
def __init__(self, port):
super().__init__()
self.port = port
self.client = None
self.running = True
self.calibrating = False
self.current_step = 0 # 0: 无操作, 1: 第一步, 2: 第二步
def run(self):
"""主线程循环"""
try:
self.client = ModbusRtuClient(
method='rtu',
port=self.port,
baudrate=19200, # 改为19200波特率
bytesize=8,
parity='N',
stopbits=1,
timeout=1
)
if not self.client.connect():
print("无法连接到串口")
return
while self.running:
if not self.calibrating:
# 正常读取浓度
self.read_concentration()
time.sleep(1)
except Exception as e:
print(f"Modbus通信错误: {e}")
finally:
if self.client:
self.client.close()
def read_concentration(self):
"""读取浓度值"""
try:
# 发送读取命令: 01 04 05 20 00 02 70 CD
# 使用read_input_registers函数读取输入寄存器
response = self.client.read_input_registers(
address=0x0520, # 起始地址
count=2, # 读取2个寄存器
unit=1 # 从站地址
)
if not response.isError():
# 解析有符号32位整数
registers = response.registers
if len(registers) == 2:
# 将两个16位寄存器组合成32位有符号整数
value = (registers[0] << 16) | registers[1]
# 转换为有符号整数
if value & 0x80000000:
value = value - 0x100000000
# 直接使用整数,不转换为浮点数
concentration = int(value)
self.concentration_updated.emit(concentration)
else:
print("读取浓度失败")
except ModbusException as e:
print(f"Modbus异常: {e}")
except Exception as e:
print(f"读取浓度异常: {e}")
def start_calibration(self):
"""开始标定流程"""
self.calibrating = True
self.current_step = 1
self.execute_calibration_step1()
def execute_calibration_step1(self):
"""执行标定第一步"""
try:
# 发送第一步命令: 01 06 10 12 FF FE ED 7F
# 使用write_register函数写入单个寄存器
response = self.client.write_register(
address=0x1012, # 寄存器地址
value=0xFFFE, # 写入的值
unit=1 # 从站地址
)
if not response.isError():
# 第一步成功,继续第二步
self.current_step = 2
self.execute_calibration_step2()
else:
self.calibration_result.emit(False, "第一步标定失败")
self.calibrating = False
except Exception as e:
self.calibration_result.emit(False, f"第一步标定异常: {e}")
self.calibrating = False
def execute_calibration_step2(self):
"""执行标定第二步"""
try:
# 发送第二步命令: 01 06 10 3E FF FE 2C B6
response = self.client.write_register(
address=0x103E, # 寄存器地址
value=0xFFFE, # 写入的值
unit=1 # 从站地址
)
if not response.isError():
self.calibration_result.emit(True, "零点标定成功")
else:
self.calibration_result.emit(False, "第二步标定失败")
self.calibrating = False
except Exception as e:
self.calibration_result.emit(False, f"第二步标定异常: {e}")
self.calibrating = False
def stop(self):
"""停止线程"""
self.running = False
self.wait()
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.modbus_worker = None
self.init_ui()
self.init_modbus()
def init_ui(self):
"""初始化用户界面"""
self.setWindowTitle("气体浓度监测与标定系统")
self.setFixedSize(600, 400)
# 设置图标
if os.path.exists("logo.png"):
self.setWindowIcon(QIcon("logo.png"))
# 创建中央部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 创建布局
layout = QVBoxLayout()
central_widget.setLayout(layout)
# 标题
title_label = QLabel("气体浓度监测系统")
title_label.setFont(QFont("Arial", 16, QFont.Bold))
title_label.setStyleSheet("color: #2c3e50; margin: 20px;")
title_label.setAlignment(Qt.AlignCenter)
layout.addWidget(title_label)
# 说明标签
info_label = QLabel("请通空气并等待稳定后点击标定零点按钮")
info_label.setFont(QFont("Arial", 12))
info_label.setStyleSheet("color: #7f8c8d; margin: 10px;")
info_label.setAlignment(Qt.AlignCenter)
layout.addWidget(info_label)
# 浓度显示
self.concentration_label = QLabel("--")
self.concentration_label.setFont(QFont("Arial", 32, QFont.Bold))
self.concentration_label.setStyleSheet("""
QLabel {
color: #e74c3c;
background-color: #ecf0f1;
border: 3px solid #bdc3c7;
border-radius: 10px;
padding: 20px;
margin: 20px;
}
""")
self.concentration_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.concentration_label)
# 按钮
self.calibrate_button = QPushButton("零点标定")
self.calibrate_button.setFont(QFont("Arial", 14))
self.calibrate_button.setStyleSheet("""
QPushButton {
background-color: #3498db;
color: white;
border: none;
border-radius: 5px;
padding: 10px 20px;
margin: 20px;
}
QPushButton:hover {
background-color: #2980b9;
}
QPushButton:pressed {
background-color: #21618c;
}
""")
self.calibrate_button.clicked.connect(self.start_calibration)
layout.addWidget(self.calibrate_button)
# 状态标签
self.status_label = QLabel("系统就绪")
self.status_label.setFont(QFont("Arial", 10))
self.status_label.setStyleSheet("color: #95a5a6; margin: 10px;")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
layout.addStretch()
def init_modbus(self):
"""初始化Modbus通信"""
try:
# 从文件读取串口号
if os.path.exists("com_config.txt"):
with open("com_config.txt", "r") as f:
port = f.read().strip()
else:
port = "COM1" # 默认值
# 创建Modbus工作线程
self.modbus_worker = ModbusWorker(port)
self.modbus_worker.concentration_updated.connect(self.update_concentration)
self.modbus_worker.calibration_result.connect(self.handle_calibration_result)
self.modbus_worker.start()
self.status_label.setText(f"已连接串口: {port} (19200bps)")
except Exception as e:
QMessageBox.critical(self, "错误", f"初始化Modbus失败: {e}")
def update_concentration(self, concentration):
"""更新浓度显示"""
# 直接显示整数,不加小数位
self.concentration_label.setText(f"{concentration} ppm")
# 根据浓度值改变颜色
if concentration < 100:
color = "#27ae60" # 绿色
elif concentration < 500:
color = "#f39c12" # 橙色
else:
color = "#e74c3c" # 红色
self.concentration_label.setStyleSheet(f"""
QLabel {{
color: {color};
background-color: #ecf0f1;
border: 3px solid #bdc3c7;
border-radius: 10px;
padding: 20px;
margin: 20px;
}}
""")
def start_calibration(self):
"""开始标定"""
if self.modbus_worker and not self.modbus_worker.calibrating:
self.calibrate_button.setEnabled(False)
self.status_label.setText("正在进行零点标定...")
self.modbus_worker.start_calibration()
def handle_calibration_result(self, success, message):
"""处理标定结果"""
self.calibrate_button.setEnabled(True)
if success:
QMessageBox.information(self, "成功", message)
self.status_label.setText("零点标定成功")
else:
QMessageBox.warning(self, "失败", message)
self.status_label.setText("零点标定失败")
def closeEvent(self, event):
"""关闭应用程序"""
if self.modbus_worker:
self.modbus_worker.stop()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
# 设置应用程序样式
app.setStyle('Fusion')
window = MainWindow()
window.show()
sys.exit(app.exec_())