diff --git a/python/mqtt/mqtt_lark.py b/python/mqtt/mqtt_lark.py new file mode 100644 index 0000000..322bec3 --- /dev/null +++ b/python/mqtt/mqtt_lark.py @@ -0,0 +1,557 @@ +import tkinter as tk +from tkinter import ttk, messagebox, font +import serial +import serial.tools.list_ports +import threading +import time +import struct +import paho.mqtt.client as mqtt +from datetime import datetime +import json +import logging +import binascii + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class SensorMQTTApp: + def __init__(self, root): + self.root = root + self.root.title("气体浓度监测系统") + self.root.geometry("800x600") + self.root.configure(bg='#f0f0f0') + + # 变量初始化 + self.serial_port = None + self.is_collecting = False + self.is_uploading = False + self.sensor_responded = False + self.upload_success = False + self.data_points = [] + self.sample_interval = 5 + self.current_concentration = 0 + + # MQTT客户端 + self.mqtt_client = None + self.mqtt_testing = False + + self.setup_ui() + self.refresh_serial_ports() + + def setup_ui(self): + # 设置主题样式 + style = ttk.Style() + style.theme_use('clam') + + # 主框架 + main_frame = ttk.Frame(self.root, padding="8") + main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) + + # 标题 + title_label = tk.Label(main_frame, text="气体浓度实时监测系统", + font=('Arial', 16, 'bold'), fg='#2c3e50', bg='#f0f0f0') + title_label.grid(row=0, column=0, columnspan=3, pady=(0, 10)) + + # 配置区域(三列布局) + config_frame = ttk.Frame(main_frame) + config_frame.grid(row=1, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N), pady=(0, 10)) + + # 第一列:串口设置 + serial_frame = ttk.LabelFrame(config_frame, text="串口设置", padding="8") + serial_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 5)) + + # 串口号 + ttk.Label(serial_frame, text="串口号:", font=('Arial', 9)).grid(row=0, column=0, sticky=tk.W, pady=3) + self.port_var = tk.StringVar() + self.port_combo = ttk.Combobox(serial_frame, textvariable=self.port_var, width=15, state='readonly') + self.port_combo.grid(row=0, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # 波特率 + ttk.Label(serial_frame, text="波特率:", font=('Arial', 9)).grid(row=1, column=0, sticky=tk.W, pady=3) + self.baudrate_var = tk.StringVar(value="19200") + baudrate_combo = ttk.Combobox(serial_frame, textvariable=self.baudrate_var, + values=["9600", "19200", "38400", "57600", "115200"], + width=15, state='readonly') + baudrate_combo.grid(row=1, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # 设备地址 + ttk.Label(serial_frame, text="设备地址:", font=('Arial', 9)).grid(row=2, column=0, sticky=tk.W, pady=3) + self.address_var = tk.StringVar(value="1") + address_spin = ttk.Spinbox(serial_frame, from_=0, to=255, textvariable=self.address_var, width=15) + address_spin.grid(row=2, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # 采样间隔 + ttk.Label(serial_frame, text="采样间隔(秒):", font=('Arial', 9)).grid(row=3, column=0, sticky=tk.W, pady=3) + self.interval_var = tk.StringVar(value="5") + interval_spin = ttk.Spinbox(serial_frame, from_=1, to=3600, textvariable=self.interval_var, width=15) + interval_spin.grid(row=3, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # 第二列:云服务设置(加宽文本框) + mqtt_frame = ttk.LabelFrame(config_frame, text="云服务设置", padding="8") + mqtt_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5)) + + # MQTT服务器地址 + ttk.Label(mqtt_frame, text="服务器地址:", font=('Arial', 9)).grid(row=0, column=0, sticky=tk.W, pady=3) + self.mqtt_broker_var = tk.StringVar(value="122.112.229.121") + ttk.Entry(mqtt_frame, textvariable=self.mqtt_broker_var, width=20).grid(row=0, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # MQTT端口 + ttk.Label(mqtt_frame, text="端口:", font=('Arial', 9)).grid(row=1, column=0, sticky=tk.W, pady=3) + self.mqtt_port_var = tk.StringVar(value="1883") + ttk.Entry(mqtt_frame, textvariable=self.mqtt_port_var, width=20).grid(row=1, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # MQTT主题(加宽文本框) + ttk.Label(mqtt_frame, text="主题:", font=('Arial', 9)).grid(row=2, column=0, sticky=tk.W, pady=3) + self.mqtt_topic_var = tk.StringVar(value="xiaofang/test") + ttk.Entry(mqtt_frame, textvariable=self.mqtt_topic_var, width=20).grid(row=2, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # MQTT用户名 + ttk.Label(mqtt_frame, text="用户名:", font=('Arial', 9)).grid(row=3, column=0, sticky=tk.W, pady=3) + self.mqtt_username_var = tk.StringVar(value="xiaofang") + ttk.Entry(mqtt_frame, textvariable=self.mqtt_username_var, width=20).grid(row=3, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # MQTT密码 + ttk.Label(mqtt_frame, text="密码:", font=('Arial', 9)).grid(row=4, column=0, sticky=tk.W, pady=3) + self.mqtt_password_var = tk.StringVar(value="xiaofang@qwer") + ttk.Entry(mqtt_frame, textvariable=self.mqtt_password_var, show="*", width=20).grid(row=4, column=1, sticky=tk.W, pady=3, padx=(5, 0)) + + # 第三列:按钮容器和系统状态 + right_column_frame = ttk.Frame(config_frame) + right_column_frame.grid(row=0, column=2, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5)) + + # 按钮容器(上下顺序排列) + button_container = ttk.LabelFrame(right_column_frame, text="操作控制", padding="8") + button_container.grid(row=0, column=2, sticky=(tk.W, tk.E, tk.N), padx=(5, 0)) + + # 测试MQTT连接按钮 + self.test_mqtt_btn = ttk.Button(button_container, text="测试连接", command=self.start_mqtt_test, width=15) + self.test_mqtt_btn.pack(pady=5) + + # 数据采集按钮 + self.collect_btn = ttk.Button(button_container, text="开始采集", command=self.toggle_collection, width=15) + self.collect_btn.pack(pady=5) + + # 数据上传按钮 + self.upload_btn = ttk.Button(button_container, text="开始上传", command=self.toggle_upload, width=15) + self.upload_btn.pack(pady=5) + + # 系统状态(上下排列) + status_frame = ttk.LabelFrame(right_column_frame, text="系统状态", padding="8") + status_frame.grid(row=0, column=3, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5)) + + # 传感器状态(上下排列) + sensor_status_frame = ttk.Frame(status_frame) + sensor_status_frame.pack(fill=tk.X, pady=3) + ttk.Label(sensor_status_frame, text="传感器:", font=('Arial', 9)).pack(side=tk.LEFT) + self.sensor_status = tk.Label(sensor_status_frame, text="●", fg="red", font=("Arial", 12, "bold")) + self.sensor_status.pack(side=tk.LEFT, padx=(5, 5)) + self.sensor_status_text = ttk.Label(sensor_status_frame, text="未连接", font=('Arial', 9)) + self.sensor_status_text.pack(side=tk.LEFT) + + # 云服务状态(上下排列) + upload_status_frame = ttk.Frame(status_frame) + upload_status_frame.pack(fill=tk.X, pady=3) + ttk.Label(upload_status_frame, text="云服务:", font=('Arial', 9)).pack(side=tk.LEFT) + self.upload_status = tk.Label(upload_status_frame, text="●", fg="red", font=("Arial", 12, "bold")) + self.upload_status.pack(side=tk.LEFT, padx=(5, 5)) + self.upload_status_text = ttk.Label(upload_status_frame, text="未连接", font=('Arial', 9)) + self.upload_status_text.pack(side=tk.LEFT) + + # 网络状态(上下排列) + network_status_frame = ttk.Frame(status_frame) + network_status_frame.pack(fill=tk.X, pady=3) + ttk.Label(network_status_frame, text="网 络:", font=('Arial', 9)).pack(side=tk.LEFT) + self.mqtt_test_status = tk.Label(network_status_frame, text="●", fg="gray", font=("Arial", 12, "bold")) + self.mqtt_test_status.pack(side=tk.LEFT, padx=(5, 5)) + self.mqtt_test_status_text = ttk.Label(network_status_frame, text="未测试", font=('Arial', 9)) + self.mqtt_test_status_text.pack(side=tk.LEFT) + + # 浓度显示区域(跨三列,和数据记录一样宽) + display_frame = ttk.LabelFrame(main_frame, text="实时浓度监测", padding="12") + display_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10)) + + # 浓度值显示(超大字体,宽度与数据记录一致) + self.concentration_var = tk.StringVar(value="0") + concentration_label = tk.Label(display_frame, textvariable=self.concentration_var, + font=('Arial', 36, 'bold'), fg='#2c3e50', bg='white', + relief='sunken', bd=2, width=25, height=2) + concentration_label.pack(pady=10) + + # 单位标签和更新时间 + info_frame = ttk.Frame(display_frame) + info_frame.pack() + + unit_label = tk.Label(info_frame, text="ppm", font=('Arial', 14), fg='#7f8c8d') + unit_label.pack(side=tk.LEFT, padx=(0, 20)) + + self.update_time_var = tk.StringVar(value="未更新") + time_label = tk.Label(info_frame, textvariable=self.update_time_var, + font=('Arial', 9), fg='#95a5a6') + time_label.pack(side=tk.LEFT) + + # 历史数据区域(跨三列,和浓度显示一样宽) + history_frame = ttk.LabelFrame(main_frame, text="最近数据记录", padding="6") + history_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S)) + + # 创建历史数据文本框(宽度与浓度显示一致) + self.history_text = tk.Text(history_frame, height=4, width=60, font=('Consolas', 9), + bg='#f8f9fa', relief='sunken', bd=1) + scrollbar = ttk.Scrollbar(history_frame, orient=tk.VERTICAL, command=self.history_text.yview) + self.history_text.configure(yscrollcommand=scrollbar.set) + + self.history_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 配置权重 + self.root.columnconfigure(0, weight=1) + self.root.rowconfigure(0, weight=1) + + main_frame.columnconfigure(0, weight=1) + main_frame.columnconfigure(1, weight=1) + main_frame.columnconfigure(2, weight=1) + main_frame.rowconfigure(2, weight=1) # 浓度显示区域 + main_frame.rowconfigure(3, weight=1) # 历史数据区域 + + config_frame.columnconfigure(0, weight=1) + config_frame.columnconfigure(1, weight=2) # 云服务设置更宽 + config_frame.columnconfigure(2, weight=1) + + serial_frame.columnconfigure(1, weight=1) + mqtt_frame.columnconfigure(1, weight=1) + right_column_frame.columnconfigure(0, weight=1) + + display_frame.columnconfigure(0, weight=1) + history_frame.columnconfigure(0, weight=1) + history_frame.rowconfigure(0, weight=1) + + def refresh_serial_ports(self): + """刷新可用串口列表(只在启动时调用一次)""" + ports = serial.tools.list_ports.comports() + port_list = [port.device for port in ports] + self.port_combo['values'] = port_list + if port_list: + self.port_combo.current(0) + + def calculate_crc16(self, data): + """计算Modbus CRC16校验""" + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: + crc = (crc >> 1) ^ 0xA001 + else: + crc = crc >> 1 + return crc + + def start_mqtt_test(self): + """启动MQTT连接测试""" + if self.mqtt_testing: + return + + self.mqtt_testing = True + self.test_mqtt_btn.config(state="disabled", text="测试中...") + self.mqtt_test_status.config(fg="yellow") + self.mqtt_test_status_text.config(text="测试中") + + test_thread = threading.Thread(target=self.test_mqtt_connection_thread, daemon=True) + test_thread.start() + + def test_mqtt_connection_thread(self): + """在单独线程中测试MQTT连接""" + try: + client = mqtt.Client() + connected = False + + def on_connect(client, userdata, flags, rc): + nonlocal connected + connected = (rc == 0) + + client.on_connect = on_connect + + username = self.mqtt_username_var.get() + password = self.mqtt_password_var.get() + if username and password: + client.username_pw_set(username, password) + + client.connect(self.mqtt_broker_var.get(), int(self.mqtt_port_var.get()), 5) + client.loop_start() + + timeout = 5 + start_time = time.time() + while not connected and time.time() - start_time < timeout: + time.sleep(0.1) + + client.loop_stop() + client.disconnect() + + if connected: + self.root.after(0, self.on_mqtt_test_success) + else: + self.root.after(0, lambda: self.on_mqtt_test_failure("连接失败")) + + except Exception as e: + self.root.after(0, lambda: self.on_mqtt_test_failure(str(e))) + + def on_mqtt_test_success(self): + """MQTT测试成功回调""" + self.mqtt_testing = False + self.test_mqtt_btn.config(state="normal", text="测试连接") + self.mqtt_test_status.config(fg="green") + self.mqtt_test_status_text.config(text="已连接") + messagebox.showinfo("成功", "MQTT连接测试成功!") + + def on_mqtt_test_failure(self, error_msg): + """MQTT测试失败回调""" + self.mqtt_testing = False + self.test_mqtt_btn.config(state="normal", text="测试连接") + self.mqtt_test_status.config(fg="red") + self.mqtt_test_status_text.config(text="连接失败") + messagebox.showerror("错误", f"MQTT连接测试失败: {error_msg}") + + def build_modbus_command(self, address): + """构建Modbus读取命令""" + command = bytearray([ + address, 0x04, 0x05, 0x20, 0x00, 0x02 + ]) + crc = self.calculate_crc16(command) + command.extend([crc & 0xFF, (crc >> 8) & 0xFF]) + return command + + def parse_sensor_data(self, response): + """解析传感器返回数据""" + try: + if len(response) >= 9: + if response[0] == int(self.address_var.get()) and response[1] == 0x04: + data_bytes = response[3:7] + concentration = struct.unpack('>i', data_bytes)[0] + return concentration + return None + except Exception as e: + logger.error(f"解析传感器数据错误: {e}") + return None + + def read_sensor_data(self): + """读取传感器数据""" + try: + if not self.serial_port or not self.serial_port.is_open: + return None + + address = int(self.address_var.get()) + command = self.build_modbus_command(address) + + self.serial_port.reset_input_buffer() + self.serial_port.write(command) + time.sleep(0.1) + + if self.serial_port.in_waiting > 0: + response = self.serial_port.read(self.serial_port.in_waiting) + concentration = self.parse_sensor_data(response) + + if concentration is not None: + self.current_concentration = concentration + self.sensor_responded = True + self.sensor_status.config(fg="green") + self.sensor_status_text.config(text="已连接") + + # 更新显示 + self.update_display(concentration) + + # 添加到历史数据 + self.add_to_history(concentration) + + # 如果上传功能开启,上传数据 + if self.is_uploading and self.mqtt_client: + self.upload_to_mqtt(concentration) + + return concentration + else: + self.sensor_responded = False + self.sensor_status.config(fg="red") + self.sensor_status_text.config(text="无响应") + else: + self.sensor_responded = False + self.sensor_status.config(fg="red") + self.sensor_status_text.config(text="无响应") + + except Exception as e: + logger.error(f"读取传感器数据错误: {e}") + self.sensor_responded = False + self.sensor_status.config(fg="red") + self.sensor_status_text.config(text="错误") + + return None + + def update_display(self, concentration): + """更新显示""" + self.concentration_var.set(str(concentration)) + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.update_time_var.set(f"最后更新: {current_time}") + + def add_to_history(self, concentration): + """添加到历史记录""" + current_time = datetime.now().strftime("%H:%M:%S") + history_entry = f"{current_time} - {concentration} ppm\n" + + # 添加到文本框开头 + self.history_text.insert('1.0', history_entry) + + # 限制历史记录条数 + lines = self.history_text.get('1.0', 'end').count('\n') + if lines > 10: + self.history_text.delete('10.0', 'end') + + def upload_to_mqtt(self, concentration): + """上传数据到MQTT""" + try: + if self.mqtt_client: + result = self.mqtt_client.publish( + self.mqtt_topic_var.get(), + str(concentration) + ) + + if result.rc == mqtt.MQTT_ERR_SUCCESS: + self.upload_success = True + self.upload_status.config(fg="green") + self.upload_status_text.config(text="已连接") + else: + self.upload_success = False + self.upload_status.config(fg="red") + self.upload_status_text.config(text="发送失败") + + except Exception as e: + logger.error(f"MQTT上传错误: {e}") + self.upload_success = False + self.upload_status.config(fg="red") + self.upload_status_text.config(text="错误") + + def connect_mqtt(self): + """连接MQTT服务器""" + try: + self.mqtt_client = mqtt.Client() + self.mqtt_client.username_pw_set( + self.mqtt_username_var.get(), + self.mqtt_password_var.get() + ) + + def on_connect(client, userdata, flags, rc): + if rc == 0: + logger.info("MQTT连接成功") + self.upload_status.config(fg="green") + self.upload_status_text.config(text="已连接") + else: + logger.error(f"MQTT连接失败,错误代码: {rc}") + self.upload_status.config(fg="red") + self.upload_status_text.config(text="连接失败") + + self.mqtt_client.on_connect = on_connect + self.mqtt_client.connect( + self.mqtt_broker_var.get(), + int(self.mqtt_port_var.get()), + 60 + ) + self.mqtt_client.loop_start() + + except Exception as e: + logger.error(f"MQTT连接错误: {e}") + messagebox.showerror("错误", f"MQTT连接失败: {str(e)}") + + def disconnect_mqtt(self): + """断开MQTT连接""" + if self.mqtt_client: + self.mqtt_client.loop_stop() + self.mqtt_client.disconnect() + self.mqtt_client = None + logger.info("MQTT连接已断开") + self.upload_status.config(fg="red") + self.upload_status_text.config(text="未连接") + + def toggle_collection(self): + """切换数据采集状态""" + if not self.is_collecting: + try: + port = self.port_var.get() + if not port: + messagebox.showwarning("警告", "请选择串口号") + return + + baudrate = int(self.baudrate_var.get()) + self.serial_port = serial.Serial(port, baudrate, timeout=1) + self.is_collecting = True + self.collect_btn.config(text="停止采集") + + # 更新状态显示 + self.sensor_status.config(fg="green") + self.sensor_status_text.config(text="已连接") + + # 启动采集线程 + self.collection_thread = threading.Thread(target=self.collection_loop, daemon=True) + self.collection_thread.start() + + except Exception as e: + messagebox.showerror("错误", f"打开串口失败: {str(e)}") + self.is_collecting = False + self.collect_btn.config(text="开始采集") + self.sensor_status.config(fg="red") + self.sensor_status_text.config(text="错误") + else: + self.is_collecting = False + self.collect_btn.config(text="开始采集") + if self.serial_port: + self.serial_port.close() + self.serial_port = None + + # 更新状态显示 + self.sensor_status.config(fg="red") + self.sensor_status_text.config(text="未连接") + + def toggle_upload(self): + """切换数据上传状态""" + if not self.is_uploading: + try: + self.connect_mqtt() + self.is_uploading = True + self.upload_btn.config(text="停止上传") + + except Exception as e: + messagebox.showerror("错误", f"启动上传失败: {str(e)}") + self.is_uploading = False + self.upload_btn.config(text="开始上传") + else: + self.is_uploading = False + self.upload_btn.config(text="开始上传") + self.disconnect_mqtt() + + def collection_loop(self): + """数据采集循环""" + while self.is_collecting: + try: + self.read_sensor_data() + interval = int(self.interval_var.get()) + time.sleep(interval) + except Exception as e: + logger.error(f"采集循环错误: {e}") + time.sleep(1) + + def on_closing(self): + """程序关闭时的清理工作""" + self.is_collecting = False + self.is_uploading = False + + if self.serial_port: + self.serial_port.close() + + if self.mqtt_client: + self.disconnect_mqtt() + + self.root.destroy() + +def main(): + root = tk.Tk() + app = SensorMQTTApp(root) + root.protocol("WM_DELETE_WINDOW", app.on_closing) + root.mainloop() + +if __name__ == "__main__": + main() \ No newline at end of file