demo/python/hfc/cali.py
2025-11-30 15:58:44 +08:00

432 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import time
from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QWidget, QMessageBox)
from PyQt5.QtCore import QTimer, QThread, pyqtSignal, Qt
from PyQt5.QtGui import QPixmap, QFont, QIcon
from pymodbus.client.sync import ModbusSerialClient as ModbusRtuClient
from pymodbus.exceptions import ModbusException
import hashlib
import base64
import uuid
import subprocess
import platform
import logging
class ModbusWorker(QThread):
"""Modbus通信工作线程"""
concentration_updated = pyqtSignal(int) # 改为int类型
calibration_result = pyqtSignal(bool, str)
def __init__(self, port):
super().__init__()
self.port = port
self.client = None
self.running = True
self.calibrating = False
self.current_step = 0 # 0: 无操作, 1: 第一步, 2: 第二步
def run(self):
"""主线程循环"""
try:
self.client = ModbusRtuClient(
method='rtu',
port=self.port,
baudrate=19200, # 改为19200波特率
bytesize=8,
parity='N',
stopbits=1,
timeout=1
)
if not self.client.connect():
print("无法连接到串口")
return
while self.running:
if not self.calibrating:
# 正常读取浓度
self.read_concentration()
time.sleep(1)
except Exception as e:
print(f"Modbus通信错误: {e}")
finally:
if self.client:
self.client.close()
def read_concentration(self):
"""读取浓度值"""
try:
# 发送读取命令: 01 04 05 20 00 02 70 CD
# 使用read_input_registers函数读取输入寄存器
response = self.client.read_input_registers(
address=0x0520, # 起始地址
count=2, # 读取2个寄存器
unit=1 # 从站地址
)
if not response.isError():
# 解析有符号32位整数
registers = response.registers
if len(registers) == 2:
# 将两个16位寄存器组合成32位有符号整数
value = (registers[0] << 16) | registers[1]
# 转换为有符号整数
if value & 0x80000000:
value = value - 0x100000000
# 直接使用整数,不转换为浮点数
concentration = int(value)
self.concentration_updated.emit(concentration)
else:
print("读取浓度失败")
except ModbusException as e:
print(f"Modbus异常: {e}")
except Exception as e:
print(f"读取浓度异常: {e}")
def start_calibration(self):
"""开始标定流程"""
self.calibrating = True
self.current_step = 1
self.execute_calibration_step1()
def execute_calibration_step1(self):
"""执行标定第一步"""
try:
# 发送第一步命令: 01 06 10 12 FF FE ED 7F
# 使用write_register函数写入单个寄存器
response = self.client.write_register(
address=0x1012, # 寄存器地址
value=0xFFFE, # 写入的值
unit=1 # 从站地址
)
if not response.isError():
# 第一步成功,继续第二步
self.current_step = 2
self.execute_calibration_step2()
else:
self.calibration_result.emit(False, "第一步标定失败")
self.calibrating = False
except Exception as e:
self.calibration_result.emit(False, f"第一步标定异常: {e}")
self.calibrating = False
def execute_calibration_step2(self):
"""执行标定第二步"""
try:
# 发送第二步命令: 01 06 10 3E FF FE 2C B6
response = self.client.write_register(
address=0x103E, # 寄存器地址
value=0xFFFE, # 写入的值
unit=1 # 从站地址
)
if not response.isError():
self.calibration_result.emit(True, "零点标定成功")
else:
self.calibration_result.emit(False, "第二步标定失败")
self.calibrating = False
except Exception as e:
self.calibration_result.emit(False, f"第二步标定异常: {e}")
self.calibrating = False
def stop(self):
"""停止线程"""
self.running = False
self.wait()
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.modbus_worker = None
self.init_ui()
self.init_modbus()
def init_ui(self):
"""初始化用户界面"""
self.setWindowTitle("气体浓度监测与标定系统")
self.setFixedSize(600, 400)
# 设置图标
if os.path.exists("logo.png"):
self.setWindowIcon(QIcon("logo.png"))
# 创建中央部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 创建布局
layout = QVBoxLayout()
central_widget.setLayout(layout)
# 标题
title_label = QLabel("气体浓度监测系统")
title_label.setFont(QFont("Arial", 16, QFont.Bold))
title_label.setStyleSheet("color: #2c3e50; margin: 20px;")
title_label.setAlignment(Qt.AlignCenter)
layout.addWidget(title_label)
# 说明标签
info_label = QLabel("请通空气并等待稳定后点击标定零点按钮")
info_label.setFont(QFont("Arial", 12))
info_label.setStyleSheet("color: #7f8c8d; margin: 10px;")
info_label.setAlignment(Qt.AlignCenter)
layout.addWidget(info_label)
# 浓度显示
self.concentration_label = QLabel("--")
self.concentration_label.setFont(QFont("Arial", 32, QFont.Bold))
self.concentration_label.setStyleSheet("""
QLabel {
color: #e74c3c;
background-color: #ecf0f1;
border: 3px solid #bdc3c7;
border-radius: 10px;
padding: 20px;
margin: 20px;
}
""")
self.concentration_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.concentration_label)
# 按钮
self.calibrate_button = QPushButton("零点标定")
self.calibrate_button.setFont(QFont("Arial", 14))
self.calibrate_button.setStyleSheet("""
QPushButton {
background-color: #3498db;
color: white;
border: none;
border-radius: 5px;
padding: 10px 20px;
margin: 20px;
}
QPushButton:hover {
background-color: #2980b9;
}
QPushButton:pressed {
background-color: #21618c;
}
""")
self.calibrate_button.clicked.connect(self.start_calibration)
if license_check() == False:
self.calibrate_button.setEnabled(False)
self.calibrate_button.setText("许可证无效,请联系管理员")
layout.addWidget(self.calibrate_button)
# 状态标签
self.status_label = QLabel("系统就绪")
self.status_label.setFont(QFont("Arial", 10))
self.status_label.setStyleSheet("color: #95a5a6; margin: 10px;")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
layout.addStretch()
def init_modbus(self):
"""初始化Modbus通信"""
try:
# 从文件读取串口号
if os.path.exists("com_config.txt"):
with open("com_config.txt", "r") as f:
port = f.read().strip()
else:
port = "COM1" # 默认值
# 创建Modbus工作线程
self.modbus_worker = ModbusWorker(port)
self.modbus_worker.concentration_updated.connect(self.update_concentration)
self.modbus_worker.calibration_result.connect(self.handle_calibration_result)
self.modbus_worker.start()
self.status_label.setText(f"已连接串口: {port} (19200bps)")
except Exception as e:
QMessageBox.critical(self, "错误", f"初始化Modbus失败: {e}")
def update_concentration(self, concentration):
"""更新浓度显示"""
# 直接显示整数,不加小数位
if license_check():
self.concentration_label.setText(f"{concentration} ppm")
else:
self.concentration_label.setText("许可证无效")
# 根据浓度值改变颜色
if concentration < 100:
color = "#27ae60" # 绿色
elif concentration < 500:
color = "#f39c12" # 橙色
else:
color = "#e74c3c" # 红色
self.concentration_label.setStyleSheet(f"""
QLabel {{
color: {color};
background-color: #ecf0f1;
border: 3px solid #bdc3c7;
border-radius: 10px;
padding: 20px;
margin: 20px;
}}
""")
def start_calibration(self):
"""开始标定"""
if self.modbus_worker and not self.modbus_worker.calibrating:
self.calibrate_button.setEnabled(False)
self.status_label.setText("正在进行零点标定...")
self.modbus_worker.start_calibration()
def handle_calibration_result(self, success, message):
"""处理标定结果"""
self.calibrate_button.setEnabled(True)
if success:
QMessageBox.information(self, "成功", message)
self.status_label.setText("零点标定成功")
else:
QMessageBox.warning(self, "失败", message)
self.status_label.setText("零点标定失败")
def closeEvent(self, event):
"""关闭应用程序"""
if self.modbus_worker:
self.modbus_worker.stop()
event.accept()
def get_windows_serial_number():
"""
获取Windows系统的唯一序列号
"""
try:
if platform.system() != "Windows":
raise Exception("此功能仅支持Windows系统")
# 使用WMIC获取BIOS序列号
result = subprocess.check_output(
'wmic bios get serialnumber',
shell=True,
stderr=subprocess.STDOUT,
text=True
)
# 解析输出结果
lines = result.strip().split('\n')
for line in lines:
if line.strip() and "SerialNumber" not in line:
serial = line.strip()
if serial and serial != "System Serial Number" and serial != "To be filled by O.E.M.":
return serial
# 如果无法获取BIOS序列号尝试获取磁盘序列号
result = subprocess.check_output(
'wmic diskdrive get serialnumber',
shell=True,
stderr=subprocess.STDOUT,
text=True
)
lines = result.strip().split('\n')
for line in lines:
if line.strip() and "SerialNumber" not in line:
serial = line.strip()
if serial:
return serial
raise Exception("无法获取系统序列号")
except Exception as e:
logging.warning(f"获取Windows序列号失败: {e}")
# 返回一个备用标识符
return str(uuid.getnode())
def get_mac_address():
"""
获取MAC地址
"""
try:
# 获取本机的MAC地址
mac = uuid.getnode()
mac_str = ':'.join(("%012X" % mac)[i:i+2] for i in range(0, 12, 2))
return mac_str
except Exception as e:
logging.error(f"获取MAC地址失败: {e}")
return "00:00:00:00:00:00"
def triple_hash_sha384(data):
"""
对数据进行三次连续的SHA384哈希计算
"""
# 第一次SHA384
hash1 = hashlib.sha384(data.encode('utf-8')).hexdigest()
# 第二次SHA384
hash2 = hashlib.sha384(hash1.encode('utf-8')).hexdigest()
# 第三次SHA384
hash3 = hashlib.sha384(hash2.encode('utf-8')).hexdigest()
return hash3
def triple_hash_sha256(data):
"""
对数据进行三次连续的SHA256哈希计算
"""
# 第一次SHA256
hash1 = hashlib.sha256(data.encode('utf-8')).hexdigest()
# 第二次SHA256
hash2 = hashlib.sha256(hash1.encode('utf-8')).hexdigest()
# 第三次SHA256
hash3 = hashlib.sha256(hash2.encode('utf-8')).hexdigest()
return hash3
def license_check():
"""
j检查证书是否合法
"""
try:
# 0. 读取证书文件
if not os.path.exists('license'):
return False
with open('license', 'r', encoding='utf-8') as f:
existing_license = f.read().strip()
# 1. 获取Windows序列号和MAC地址
serial_number = get_windows_serial_number()
mac_address = get_mac_address()
# 2. 对序列号进行三次SHA384计算
serial_hash = triple_hash_sha384(serial_number)
# 3. 对MAC地址进行三次SHA256计算
mac_hash = triple_hash_sha256(mac_address)
# 4. 拼接两个哈希值并进行Base64编码
combined_hash = serial_hash + mac_hash
base64_encoded = base64.b64encode(combined_hash.encode('utf-8')).decode('utf-8')
# 5. 对Base64结果进行三次SHA384计算
final_hash = triple_hash_sha384(base64_encoded)
# 6. 对最终哈希值进行Base64编码
final_base64 = base64.b64encode(final_hash.encode('utf-8')).decode('utf-8')
# 7. 校验证书合法性
return existing_license == final_base64
except Exception as e:
print(f"\n许可证获取失败: {e}")
return False
if __name__ == "__main__":
app = QApplication(sys.argv)
# 设置应用程序样式
app.setStyle('Fusion')
window = MainWindow()
window.show()
sys.exit(app.exec_())