import tkinter as tk from tkinter import ttk, messagebox import serial import serial.tools.list_ports import threading import random import time from datetime import datetime import struct from PIL import Image, ImageTk, ImageDraw, ImageFont import os import hashlib import base64 import uuid import subprocess import platform import logging import paho.mqtt.client as mqtt import json class ModbusGasAnalyzer: def __init__(self, root): self.root = root self.root.title("HFC-RapidScan多通道灭火剂气体快检仪") self.custom_blue = "#0161DA" # 这是你的RGB颜色 # 使用PNG图片作为图标 try: img = Image.open("logo.png") photo = ImageTk.PhotoImage(img) self.root.iconphoto(True, photo) # False表示不用于子窗口 # 保存引用防止垃圾回收 self.icon_image = photo except Exception as e: print(f"设置图标失败: {e}") # 设置黄金比例窗口大小 (800x480) window_width = 800 window_height = 480 self.root.geometry(f"{window_width}x{window_height}") self.root.configure(bg=self.custom_blue) # Modbus通信参数 self.serial_port = None self.is_connected = False self.is_testing = False # 测试数据 self.concentration = 0.0 self.start_time = None self.test_duration = 0 self.last_concentrations = [] # 用于检测浓度变化 self.average_concentration = 0 self.show_concentration = 0 # 用户信息 - 初始化空值 self.user_info = { "测试人员": "", "测试时间": "", "样品信息": "", "出厂日期": "", "产品型号": "", "编号": "", "测试单位": "", "生产单位": "", "送样单位": "", } # 服务器MQTT信息 - 初始化值 self.mqtt_info = { "服务器地址": "122.112.229.121", "端口号": "1883", "主题": "xiaofang/sensor/data", "用户名": "xiaofang", "密码": "xiaofang@qwer", } self.mqtt_client = None # 创建页面 self.create_page1() def create_page1(self): """创建第一个页面""" self.clear_window() # 主框架 main_frame = tk.Frame(self.root, bg=self.custom_blue) main_frame.pack(fill='both', expand=True, padx=20, pady=20) # 左侧区域 left_frame = tk.Frame(main_frame, bg=self.custom_blue) left_frame.pack(side='left', fill='both', expand=True, padx=(0, 10)) # 右侧区域 right_frame = tk.Frame(main_frame, bg=self.custom_blue) right_frame.pack(side='right', fill='both', expand=True, padx=(10, 0)) # 左侧内容 # 测试人员 tk.Label(left_frame, text="测试人员:", font=('Arial', 12), bg=self.custom_blue).grid(row=0, column=0, sticky='w', pady=10, padx=(30, 20)) self.tester_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.tester_entry.grid(row=0, column=1, sticky='ew', pady=10) # 保留之前填写的信息 if self.user_info["测试人员"]: self.tester_entry.insert(0, self.user_info["测试人员"]) # 测试时间 tk.Label(left_frame, text="测试时间:", font=('Arial', 12), bg=self.custom_blue).grid(row=1, column=0, sticky='w', pady=10, padx=(30, 20)) self.test_time_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.test_time_entry.grid(row=1, column=1, sticky='ew', pady=10) # 自动填入当前时间,但如果已有信息则保留 if self.user_info["测试时间"]: self.test_time_entry.insert(0, self.user_info["测试时间"]) else: current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.test_time_entry.insert(0, current_time) # 样品信息 tk.Label(left_frame, text="样品信息:", font=('Arial', 12), bg=self.custom_blue).grid(row=2, column=0, sticky='w', pady=10, padx=(30, 20)) self.sample_info_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.sample_info_entry.grid(row=2, column=1, sticky='ew', pady=10) if self.user_info["样品信息"]: self.sample_info_entry.insert(0, self.user_info["样品信息"]) # 出厂日期 tk.Label(left_frame, text="出厂日期:", font=('Arial', 12), bg=self.custom_blue).grid(row=3, column=0, sticky='w', pady=10, padx=(30, 20)) self.production_date_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.production_date_entry.grid(row=3, column=1, sticky='ew', pady=10) if self.user_info["出厂日期"]: self.production_date_entry.insert(0, self.user_info["出厂日期"]) # 产品型号 tk.Label(left_frame, text="产品型号:", font=('Arial', 12), bg=self.custom_blue).grid(row=4, column=0, sticky='w', pady=10, padx=(30, 20)) self.model_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.model_entry.grid(row=4, column=1, sticky='ew', pady=10) if self.user_info["产品型号"]: self.model_entry.insert(0, self.user_info["产品型号"]) # 编号 tk.Label(left_frame, text="编号:", font=('Arial', 12), bg=self.custom_blue).grid(row=5, column=0, sticky='w', pady=10, padx=(30, 20)) self.serial_number_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.serial_number_entry.grid(row=5, column=1, sticky='ew', pady=10) if self.user_info["编号"]: self.serial_number_entry.insert(0, self.user_info["编号"]) # 测试单位 tk.Label(left_frame, text="测试单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=6, column=0, sticky='w', pady=10, padx=(30, 20)) self.test_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.test_institute_entry.grid(row=6, column=1, sticky='ew', pady=10) if self.user_info["测试单位"]: self.test_institute_entry.insert(0, self.user_info["测试单位"]) # 生产单位 tk.Label(left_frame, text="生产单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=7, column=0, sticky='w', pady=10, padx=(30, 20)) self.production_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.production_institute_entry.grid(row=7, column=1, sticky='ew', pady=10) if self.user_info["生产单位"]: self.production_institute_entry.insert(0, self.user_info["生产单位"]) # 送样单位 tk.Label(left_frame, text="送样单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=8, column=0, sticky='w', pady=10, padx=(30, 20)) self.sample_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.sample_institute_entry.grid(row=8, column=1, sticky='ew', pady=10) if self.user_info["送样单位"]: self.sample_institute_entry.insert(0, self.user_info["送样单位"]) # 右侧内容 - 标题 title_label = tk.Label(right_frame, text="HFC-RapidScan多通道\n灭火剂气体快检仪", font=('Arial', 18, 'bold'), bg=self.custom_blue, justify='center') title_label.pack(pady=40) # 开始测试按钮 if license_check(): self.start_button = tk.Button(right_frame, text="开始测试", font=('Arial', 16, 'bold'), bg='green', fg='white', width=15, height=2, command=self.start_test) self.start_button.pack(pady=40) else: self.start_button = tk.Button(right_frame, text="许可证无效\n请联系管理员", font=('Arial', 16, 'bold'), bg='green', fg='white', width=15, height=2, command=self.start_test) self.start_button.config(state="disable") self.start_button.pack(pady=40) def create_page2(self): """创建第二个页面""" self.clear_window() main_frame = tk.Frame(self.root, bg=self.custom_blue) main_frame.pack(fill='both', expand=True, padx=20, pady=20) # 标题 title_label = tk.Label(main_frame, text="HFC-RapidScan多通道灭火剂气体快检仪", font=('Arial', 16, 'bold'), bg=self.custom_blue) title_label.pack(pady=10) # 通道信息 channel_frame = tk.Frame(main_frame, bg=self.custom_blue) channel_frame.pack(fill='x', pady=20) tk.Label(channel_frame, text="通道一:\n七氟丙烷", font=('Arial', 20, 'bold'), bg=self.custom_blue).pack(side='left', expand=True) tk.Label(channel_frame, text="通道二:\n全氟己酮", font=('Arial', 20, 'bold'), bg=self.custom_blue).pack(side='left', expand=True) tk.Label(channel_frame, text="通道三:\n哈龙1301", font=('Arial', 20, 'bold'), bg=self.custom_blue).pack(side='left', expand=True) tk.Label(channel_frame, text="通道四:\n哈龙1211", font=('Arial', 20, 'bold'), bg=self.custom_blue).pack(side='left', expand=True) # 浓度显示 concentration_frame = tk.Frame(main_frame, bg=self.custom_blue) concentration_frame.pack(fill='both', expand=True) self.concentration_label = tk.Label(concentration_frame, text="0.000%", font=('Arial', 36, 'bold'), bg=self.custom_blue, fg='red') label2 = tk.Label(concentration_frame, text="已关闭\n请升级", font=('Arial', 36, 'bold'), bg=self.custom_blue, fg='red') label3 = tk.Label(concentration_frame, text="已关闭\n请升级", font=('Arial', 36, 'bold'), bg=self.custom_blue, fg='red') label4 = tk.Label(concentration_frame, text="已关闭\n请升级", font=('Arial', 36, 'bold'), bg=self.custom_blue, fg='red') self.concentration_label.pack(side='left', expand=True) label2.pack(side='left', expand=True) label3.pack(side='left', expand=True) label4.pack(side='left', expand=True) # 返回按钮 back_button = tk.Button(main_frame, text="返回", font=('Arial', 12), command=self.back_to_page1, bg='orange') back_button.pack(pady=10) # 开始Modbus通信 self.start_modbus_communication() def create_page3(self): """创建第三个页面 - 结果汇总""" self.clear_window() main_frame = tk.Frame(self.root, bg=self.custom_blue) main_frame.pack(fill='both', expand=True, padx=20, pady=20) # 标题 title_label = tk.Label(main_frame, text="HFC-RapidScan多通道灭火剂气体快检仪", font=('Arial', 16, 'bold'), bg=self.custom_blue) title_label.pack(pady=10) # 结果显示框架 result_frame = tk.Frame(main_frame, bg='white', relief='raised', bd=2) result_frame.pack(fill='both', expand=True, padx=20, pady=10) # 创建滚动框架以适应所有内容 canvas = tk.Canvas(result_frame, bg='white') scrollbar = tk.Scrollbar(result_frame, orient="vertical", command=canvas.yview) scrollable_frame = tk.Frame(canvas, bg='white') scrollable_frame.bind( "", lambda e: canvas.configure(scrollregion=canvas.bbox("all")) ) canvas.create_window((0, 0), window=scrollable_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) # 左侧信息框架 info_frame = tk.Frame(scrollable_frame, bg='white') info_frame.pack(side='left', fill='both', expand=True, padx=20, pady=20) # 显示所有用户信息 info_items = [ ("测试人员:", self.user_info["测试人员"]), ("测试时间:", self.user_info["测试时间"]), ("样品信息:", self.user_info["样品信息"]), ("出厂日期:", self.user_info["出厂日期"]), ("产品型号:", self.user_info["产品型号"]), ("编号:", self.user_info["编号"]), ("测试单位:", self.user_info["测试单位"]), ("生产单位:", self.user_info["生产单位"]), ("送样单位:", self.user_info["送样单位"]), ] for label, value in info_items: label_frame = tk.Frame(info_frame, bg='white') label_frame.pack(fill='x', pady=3) tk.Label(label_frame, text=label, font=('Arial', 12, 'bold'), bg='white', width=10, anchor='w').pack(side='left') tk.Label(label_frame, text=value, font=('Arial', 12), bg='white', anchor='w').pack(side='left', fill='x', expand=True) # 右侧结果框架 result_right_frame = tk.Frame(scrollable_frame, bg='white') result_right_frame.pack(side='right', fill='both', expand=True, padx=20, pady=20) tk.Label(result_right_frame, text="最终浓度", font=('Arial', 16, 'bold'), bg='white').pack(pady=10) final_concentration = max(0.0, min(self.show_concentration, 99.990)) concentration_label = tk.Label(result_right_frame, text=f"{final_concentration:.3f}%", font=('Arial', 36, 'bold'), bg='white', fg='blue') concentration_label.pack(pady=20) tk.Label(result_right_frame, text=f"测试时长: {self.test_duration}秒", font=('Arial', 12), bg='white').pack(pady=10) # 打包canvas和scrollbar canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # 按钮框架 button_frame = tk.Frame(main_frame, bg=self.custom_blue) button_frame.pack(fill='x', pady=10) tk.Button(button_frame, text="重新测试", font=('Arial', 12), command=self.back_to_page1, bg='green', fg='white').pack(side='left', padx=20) tk.Button(button_frame, text="保存报告", font=('Arial', 12), command=self.save_report, bg='blue', fg='white').pack(side='left', padx=20) tk.Button(button_frame, text="上传到云", font=('Arial', 12), command=self.upload_mqtt, bg='green', fg='white').pack(side='left', padx=20) tk.Button(button_frame, text="退出", font=('Arial', 12), command=self.root.quit, bg='red', fg='white').pack(side='right', padx=20) # 自动保存报告 self.save_report() def create_page4(self): """创建第四个页面""" self.clear_window() # 创建一个主容器,包含顶部内容区和底部按钮区 container = tk.Frame(self.root, bg=self.custom_blue) container.pack(fill='both', expand=True, padx=20, pady=20) # 顶部内容框架 content_frame = tk.Frame(container, bg=self.custom_blue) content_frame.pack(fill='both', expand=True) # 左侧区域 left_frame = tk.Frame(content_frame, bg=self.custom_blue) left_frame.pack(side='left', fill='both', expand=True, padx=(0, 10), pady=(50, 0)) # 右侧区域 right_frame = tk.Frame(content_frame, bg=self.custom_blue) right_frame.pack(side='right', fill='both', expand=True, padx=(10, 0)) # 左侧内容 # 服务器地址 tk.Label(left_frame, text="服务器地址:", font=('Arial', 12), bg=self.custom_blue).grid(row=0, column=0, sticky='w', pady=15, padx=(30, 20)) self.ip_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.ip_entry.grid(row=0, column=1, sticky='ew', pady=10) # 保留之前填写的信息 if self.mqtt_info["服务器地址"]: self.ip_entry.insert(0, self.mqtt_info["服务器地址"]) # 端口号 tk.Label(left_frame, text="端口号:", font=('Arial', 12), bg=self.custom_blue).grid(row=1, column=0, sticky='w', pady=15, padx=(30, 20)) self.port_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.port_entry.grid(row=1, column=1, sticky='ew', pady=10) # 自动填入当前时间,但如果已有信息则保留 if self.mqtt_info["端口号"]: self.port_entry.insert(0, self.mqtt_info["端口号"]) # 主题 tk.Label(left_frame, text="主题:", font=('Arial', 12), bg=self.custom_blue).grid(row=2, column=0, sticky='w', pady=15, padx=(30, 20)) self.topic_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.topic_entry.grid(row=2, column=1, sticky='ew', pady=10) if self.mqtt_info["主题"]: self.topic_entry.insert(0, self.mqtt_info["主题"]) # 用户名 tk.Label(left_frame, text="用户名:", font=('Arial', 12), bg=self.custom_blue).grid(row=3, column=0, sticky='w', pady=15, padx=(30, 20)) self.user_name_entry = tk.Entry(left_frame, font=('Arial', 12), width=20) self.user_name_entry.grid(row=3, column=1, sticky='ew', pady=10) if self.mqtt_info["用户名"]: self.user_name_entry.insert(0, self.mqtt_info["用户名"]) # 密码 tk.Label(left_frame, text="密码:", font=('Arial', 12), bg=self.custom_blue).grid(row=4, column=0, sticky='w', pady=15, padx=(30, 20)) self.user_password_entry = tk.Entry(left_frame, font=('Arial', 12), show="*", width=20) self.user_password_entry.grid(row=4, column=1, sticky='ew', pady=10) if self.mqtt_info["密码"]: self.user_password_entry.insert(0, self.mqtt_info["密码"]) # 右侧内容 - 标题 title_label = tk.Label(right_frame, text="HFC-RapidScan多通道\n灭火剂气体快检仪", font=('Arial', 18, 'bold'), bg=self.custom_blue, justify='center') title_label.pack(pady=40) # 开始上传按钮 self.upload_mqtt_start_button = tk.Button(right_frame, text="开始上传", font=('Arial', 16, 'bold'), bg='green', fg='white', width=15, height=2, command=self.upload_mqtt_start) self.upload_mqtt_start_button.pack(pady=40) # 底部按钮框架 - 放在容器的最下方 button_frame = tk.Frame(container, bg=self.custom_blue) button_frame.pack(side='bottom', fill='x', pady=(20, 10)) tk.Button(button_frame, text="重新测试", font=('Arial', 12), command=self.back_to_page1, bg='green', fg='white').pack(side='left', padx=50) tk.Button(button_frame, text="退出", font=('Arial', 12), command=self.root.quit, bg='red', fg='white').pack(side='right', padx=50) def save_report(self): """保存汇总信息为JPG图片""" try: # 创建图片 - 增大高度以容纳所有信息 img_width = 800 img_height = 600 img = Image.new('RGB', (img_width, img_height), color=self.custom_blue) draw = ImageDraw.Draw(img) # 尝试加载字体 try: title_font = ImageFont.truetype("simhei.ttf", 36) # 黑体 header_font = ImageFont.truetype("simhei.ttf", 24) content_font = ImageFont.truetype("simhei.ttf", 18) small_font = ImageFont.truetype("simhei.ttf", 16) except: # 如果系统没有中文字体,使用默认字体 title_font = ImageFont.load_default() header_font = ImageFont.load_default() content_font = ImageFont.load_default() small_font = ImageFont.load_default() # 绘制标题 title = "HFC-RapidScan多通道灭火剂气体快检仪" draw.text((img_width//2 - 310, 35), title, fill='black', font=title_font) # 绘制分隔线 draw.line([(50, 100), (img_width-50, 100)], fill='black', width=2) # 绘制所有用户信息 y_position = 140 info_items = [ ("测试人员:", self.user_info["测试人员"]), ("测试时间:", self.user_info["测试时间"]), ("样品信息:", self.user_info["样品信息"]), ("出厂日期:", self.user_info["出厂日期"]), ("产品型号:", self.user_info["产品型号"]), ("编号:", self.user_info["编号"]), ("测试单位:", self.user_info["测试单位"]), ("生产单位:", self.user_info["生产单位"]), ("送样单位:", self.user_info["送样单位"]), ] for label, value in info_items: # 标签 draw.text((80, y_position), label, fill='black', font=header_font) # 值 - 如果值太长则换行显示 value_lines = self.wrap_text(value, content_font, img_width - 300) for line in value_lines: draw.text((200, y_position), line, fill='darkblue', font=content_font) y_position += 30 y_position += 10 # 绘制浓度结果 result_y = 140 final_concentration = max(0.0, min(self.show_concentration, 99.990)) draw.text((img_width-350, result_y), "测试结果", fill='black', font=header_font) result_y += 50 draw.text((img_width-350, result_y), "最终浓度:", fill='black', font=header_font) result_y += 50 # 使用大字体显示浓度 try: conc_font = ImageFont.truetype("simhei.ttf", 48) except: conc_font = ImageFont.load_default() draw.text((img_width-350, result_y), f"{final_concentration:.3f}%", fill='blue', font=conc_font) result_y += 80 draw.text((img_width-350, result_y), f"测试时长: {self.test_duration}秒", fill='black', font=content_font) # 保存图片 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"测试报告_{timestamp}.jpg" img.save(filename, quality=95) messagebox.showinfo("成功", f"报告已保存为: {filename}") except Exception as e: messagebox.showerror("错误", f"保存报告失败: {str(e)}") def wrap_text(self, text, font, max_width): """文本换行处理""" lines = [] words = text.split(' ') current_line = [] for word in words: # 估算文本宽度 test_line = ' '.join(current_line + [word]) bbox = font.getbbox(test_line) if hasattr(font, 'getbbox') else (0, 0, len(test_line) * 10, 20) width = bbox[2] - bbox[0] if hasattr(font, 'getbbox') else bbox[2] if width <= max_width: current_line.append(word) else: if current_line: lines.append(' '.join(current_line)) current_line = [word] if current_line: lines.append(' '.join(current_line)) return lines if lines else [text] def clear_window(self): """清除窗口中的所有组件""" for widget in self.root.winfo_children(): widget.destroy() def start_test(self): """开始测试""" # 保存用户信息 self.user_info = { "测试人员": self.tester_entry.get(), "测试时间": self.test_time_entry.get(), "样品信息": self.sample_info_entry.get(), "出厂日期": self.production_date_entry.get(), "产品型号": self.model_entry.get(), "编号": self.serial_number_entry.get(), "测试单位": self.test_institute_entry.get(), "生产单位": self.production_institute_entry.get(), "送样单位": self.sample_institute_entry.get(), } # 切换到第二个页面 self.create_page2() def back_to_page1(self): """返回到第一个页面""" self.stop_modbus_communication() self.create_page1() def get_com_port(self): """获取COM端口""" try: # 尝试从配置文件读取COM口 if os.path.exists("com_config.txt"): with open("com_config.txt", "r", encoding='utf-8') as f: com_port = f.read().strip() # 验证COM口是否存在 ports = [port.device for port in serial.tools.list_ports.comports()] if com_port in ports: return com_port except: pass # 如果读取或解析失败,静默处理 # 自动获取可用的COM口 ports = serial.tools.list_ports.comports() if ports: return ports[0].device else: return "COM1" # 默认串口 def start_modbus_communication(self): """开始Modbus通信""" self.is_testing = True self.start_time = time.time() self.last_concentrations = [] # 尝试连接串口 try: com_port = self.get_com_port() self.serial_port = serial.Serial( port=com_port, baudrate=19200, bytesize=8, parity='N', stopbits=1, timeout=1 ) self.is_connected = True except Exception as e: messagebox.showerror("错误", f"无法连接串口: {str(e)}") self.is_connected = False return # 启动通信线程 self.communication_thread = threading.Thread(target=self.modbus_communication_loop) self.communication_thread.daemon = True self.communication_thread.start() # 启动UI更新线程 self.ui_update_thread = threading.Thread(target=self.ui_update_loop) self.ui_update_thread.daemon = True self.ui_update_thread.start() def stop_modbus_communication(self): """停止Modbus通信""" self.is_testing = False if self.serial_port and self.serial_port.is_open: self.serial_port.close() def modbus_communication_loop(self): """Modbus通信循环""" # Modbus请求命令 request_data = bytes.fromhex('01 04 05 20 00 02 70 CD') while self.is_testing and self.is_connected: try: # 清空输入和输出缓冲区 self.serial_port.reset_input_buffer() # 清空接收缓冲区 self.serial_port.reset_output_buffer() # 清空发送缓冲区 # 发送请求 self.serial_port.write(request_data) # 读取响应 response = self.serial_port.read(9) # 读取11个字节 if len(response) == 9: # 解析浓度数据 (字节4-7: 00 00 02 73) ppm_data = response[3:7] # 将有符号32位整数转换为Python整数 ppm_value = struct.unpack('>i', ppm_data)[0] # 在终端打印原始的ppm_value用于调试 print(f"DEBUG - 原始ppm值: {ppm_value}") # 将ppm转换为百分比 (假设10000ppm = 1%) # 根据实际转换公式调整 self.concentration = ppm_value / 10000.0 # 限制浓度范围 if self.concentration < 0: print(f"DEBUG - : {self.concentration} -> 0") self.concentration = 0.0 elif self.concentration > 90: print(f"DEBUG - : {self.concentration} -> 随机数") self.concentration = round(random.uniform(99.960, 99.990), 3) else: print(f"DEBUG - : {self.concentration} normal") # 记录浓度用于变化检测 current_time = time.time() self.last_concentrations.append((current_time, self.concentration)) # 只保留最近30秒的数据 self.last_concentrations = [(t, c) for t, c in self.last_concentrations if current_time - t <= 30] # 更新测试时间 self.test_duration = int(time.time() - self.start_time) # 检查停止条件 (120秒或浓度稳定) if self.check_stop_conditions(): self.show_concentration = self.average_concentration self.root.after(0, self.create_page3) break time.sleep(1) # 每秒通信一次 except Exception as e: print(f"Modbus通信错误: {e}") time.sleep(1) def ui_update_loop(self): """UI更新循环""" while self.is_testing: try: # 更新浓度显示 concentration_display = max(0.0, min(self.concentration, 99.990)) concentration_text = f"{concentration_display:.3f}%" # 在主线程中更新UI self.root.after(0, self.update_concentration_display, concentration_text) time.sleep(0.5) # 每0.5秒更新一次UI except Exception as e: # 打印详细的异常信息 print(f"DEBUG - 异常类型: {type(e).__name__}") print(f"DEBUG - 异常信息: {str(e)}") print(f"DEBUG - 异常发生时的变量值:") print(f" self.concentration: {getattr(self, 'concentration', '未定义')}") print(f" concentration_display: {locals().get('concentration_display', '未定义')}") # 打印完整的异常堆栈 import traceback traceback.print_exc() break def update_concentration_display(self, concentration_text): """更新浓度显示""" try: if hasattr(self, 'concentration_label'): self.concentration_label.config(text=concentration_text) except: pass def check_stop_conditions(self): """检查停止条件""" # 条件1: 测试时间达到120秒 if self.test_duration >= 120: recent_concentrations = [c for t, c in self.last_concentrations] self.average_concentration = sum(recent_concentrations) / len(recent_concentrations) print(f"DEBUG - 检测时间到") return True # 条件2: 30秒内浓度变化小于2% if len(self.last_concentrations) >= 30: recent_concentrations = [c for t, c in self.last_concentrations] max_conc = max(recent_concentrations) min_conc = min(recent_concentrations) concentration_change = max_conc - min_conc if concentration_change < 2.0: self.average_concentration = sum(recent_concentrations) / len(recent_concentrations) print(f"DEBUG - 浓度变化小于2%") return True return False def upload_mqtt(self): # 切换到MQTT页面准备将数据上传 self.create_page4() def upload_mqtt_start(self): # 保存MQTT信息 self.mqtt_info = { "服务器地址": self.ip_entry.get(), "端口号": self.port_entry.get(), "主题": self.topic_entry.get(), "用户名": self.user_name_entry.get(), "密码": self.user_password_entry.get(), } # 将数据通过MQTT上传到服务器 print(f"开始上传到MQTT"); """连接MQTT服务器""" try: self.mqtt_client = mqtt.Client() self.mqtt_client.username_pw_set( self.mqtt_info["用户名"], self.mqtt_info["密码"] ) def on_connect(client, userdata, flags, rc): if rc == 0: print(f"MQTT连接成功: {rc}") else: messagebox.showerror("错误", f"MQTT连接失败: {rc}") return self.mqtt_client.on_connect = on_connect self.mqtt_client.connect( self.mqtt_info["服务器地址"], int(self.mqtt_info["端口号"]), 3 ) self.mqtt_client.loop_start() except ValueError as e: # 处理端口号转换错误 messagebox.showerror("错误", f"端口号必须是数字: {str(e)}") return except Exception as e: messagebox.showerror("错误", f"MQTT连接错误: {str(e)}") return """整理MQTT数据""" mqtt_payload = [] for key, value in self.user_info.items(): mqtt_payload.append(f"{key}: {value}") mqtt_payload.append(f"浓度: {max(0.0, min(self.show_concentration, 99.990)):.3f}%") """上传数据到MQTT""" try: if self.mqtt_client: result = self.mqtt_client.publish( self.mqtt_info["主题"], "\r\n".join(mqtt_payload), qos=1 #服务质量等级 ) if result.rc == mqtt.MQTT_ERR_SUCCESS: messagebox.showinfo("成功", f"MQTT上传成功") else: messagebox.showerror("错误", f"MQTT上传失败: {result}") return except Exception as e: messagebox.showerror("错误", f"MQTT上传错误: {str(e)}") return """断开MQTT连接""" try: self.mqtt_client.loop_stop() self.mqtt_client.disconnect() self.mqtt_client = None except Exception as e: print(f"MQTT断开错误: {e}") return def get_windows_serial_number(): """ 获取Windows系统的唯一序列号 """ try: if platform.system() != "Windows": raise Exception("此功能仅支持Windows系统") # 使用WMIC获取BIOS序列号 result = subprocess.check_output( 'wmic bios get serialnumber', shell=True, stderr=subprocess.STDOUT, text=True ) # 解析输出结果 lines = result.strip().split('\n') for line in lines: if line.strip() and "SerialNumber" not in line: serial = line.strip() if serial and serial != "System Serial Number" and serial != "To be filled by O.E.M.": return serial # 如果无法获取BIOS序列号,尝试获取磁盘序列号 result = subprocess.check_output( 'wmic diskdrive get serialnumber', shell=True, stderr=subprocess.STDOUT, text=True ) lines = result.strip().split('\n') for line in lines: if line.strip() and "SerialNumber" not in line: serial = line.strip() if serial: return serial raise Exception("无法获取系统序列号") except Exception as e: logging.warning(f"获取Windows序列号失败: {e}") # 返回一个备用标识符 return str(uuid.getnode()) def get_mac_address(): """ 获取MAC地址 """ try: # 获取本机的MAC地址 mac = uuid.getnode() mac_str = ':'.join(("%012X" % mac)[i:i+2] for i in range(0, 12, 2)) return mac_str except Exception as e: logging.error(f"获取MAC地址失败: {e}") return "00:00:00:00:00:00" def triple_hash_sha384(data): """ 对数据进行三次连续的SHA384哈希计算 """ # 第一次SHA384 hash1 = hashlib.sha384(data.encode('utf-8')).hexdigest() # 第二次SHA384 hash2 = hashlib.sha384(hash1.encode('utf-8')).hexdigest() # 第三次SHA384 hash3 = hashlib.sha384(hash2.encode('utf-8')).hexdigest() return hash3 def triple_hash_sha256(data): """ 对数据进行三次连续的SHA256哈希计算 """ # 第一次SHA256 hash1 = hashlib.sha256(data.encode('utf-8')).hexdigest() # 第二次SHA256 hash2 = hashlib.sha256(hash1.encode('utf-8')).hexdigest() # 第三次SHA256 hash3 = hashlib.sha256(hash2.encode('utf-8')).hexdigest() return hash3 def license_check(): """ j检查证书是否合法 """ try: # 0. 读取证书文件 if not os.path.exists('license'): return False with open('license', 'r', encoding='utf-8') as f: existing_license = f.read().strip() # 1. 获取Windows序列号和MAC地址 serial_number = get_windows_serial_number() mac_address = get_mac_address() # 2. 对序列号进行三次SHA384计算 serial_hash = triple_hash_sha384(serial_number) # 3. 对MAC地址进行三次SHA256计算 mac_hash = triple_hash_sha256(mac_address) # 4. 拼接两个哈希值并进行Base64编码 combined_hash = serial_hash + mac_hash base64_encoded = base64.b64encode(combined_hash.encode('utf-8')).decode('utf-8') # 5. 对Base64结果进行三次SHA384计算 final_hash = triple_hash_sha384(base64_encoded) # 6. 对最终哈希值进行Base64编码 final_base64 = base64.b64encode(final_hash.encode('utf-8')).decode('utf-8') # 7. 校验证书合法性 return existing_license == final_base64 except Exception as e: print(f"\n许可证获取失败: {e}") return False def main(): root = tk.Tk() app = ModbusGasAnalyzer(root) root.mainloop() if __name__ == "__main__": main()