demo/python/mqtt/mqtt_lark.py

557 lines
24 KiB
Python
Raw Normal View History

2025-11-16 20:28:02 +08:00
import tkinter as tk
from tkinter import ttk, messagebox, font
import serial
import serial.tools.list_ports
import threading
import time
import struct
import paho.mqtt.client as mqtt
from datetime import datetime
import json
import logging
import binascii
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SensorMQTTApp:
def __init__(self, root):
self.root = root
self.root.title("气体浓度监测系统")
self.root.geometry("800x600")
self.root.configure(bg='#f0f0f0')
# 变量初始化
self.serial_port = None
self.is_collecting = False
self.is_uploading = False
self.sensor_responded = False
self.upload_success = False
self.data_points = []
self.sample_interval = 5
self.current_concentration = 0
# MQTT客户端
self.mqtt_client = None
self.mqtt_testing = False
self.setup_ui()
self.refresh_serial_ports()
def setup_ui(self):
# 设置主题样式
style = ttk.Style()
style.theme_use('clam')
# 主框架
main_frame = ttk.Frame(self.root, padding="8")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 标题
title_label = tk.Label(main_frame, text="气体浓度实时监测系统",
font=('Arial', 16, 'bold'), fg='#2c3e50', bg='#f0f0f0')
title_label.grid(row=0, column=0, columnspan=3, pady=(0, 10))
# 配置区域(三列布局)
config_frame = ttk.Frame(main_frame)
config_frame.grid(row=1, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N), pady=(0, 10))
# 第一列:串口设置
serial_frame = ttk.LabelFrame(config_frame, text="串口设置", padding="8")
serial_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 5))
# 串口号
ttk.Label(serial_frame, text="串口号:", font=('Arial', 9)).grid(row=0, column=0, sticky=tk.W, pady=3)
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(serial_frame, textvariable=self.port_var, width=15, state='readonly')
self.port_combo.grid(row=0, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 波特率
ttk.Label(serial_frame, text="波特率:", font=('Arial', 9)).grid(row=1, column=0, sticky=tk.W, pady=3)
self.baudrate_var = tk.StringVar(value="19200")
baudrate_combo = ttk.Combobox(serial_frame, textvariable=self.baudrate_var,
values=["9600", "19200", "38400", "57600", "115200"],
width=15, state='readonly')
baudrate_combo.grid(row=1, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 设备地址
ttk.Label(serial_frame, text="设备地址:", font=('Arial', 9)).grid(row=2, column=0, sticky=tk.W, pady=3)
self.address_var = tk.StringVar(value="1")
address_spin = ttk.Spinbox(serial_frame, from_=0, to=255, textvariable=self.address_var, width=15)
address_spin.grid(row=2, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 采样间隔
ttk.Label(serial_frame, text="采样间隔(秒):", font=('Arial', 9)).grid(row=3, column=0, sticky=tk.W, pady=3)
self.interval_var = tk.StringVar(value="5")
interval_spin = ttk.Spinbox(serial_frame, from_=1, to=3600, textvariable=self.interval_var, width=15)
interval_spin.grid(row=3, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 第二列:云服务设置(加宽文本框)
mqtt_frame = ttk.LabelFrame(config_frame, text="云服务设置", padding="8")
mqtt_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5))
# MQTT服务器地址
ttk.Label(mqtt_frame, text="服务器地址:", font=('Arial', 9)).grid(row=0, column=0, sticky=tk.W, pady=3)
self.mqtt_broker_var = tk.StringVar(value="122.112.229.121")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_broker_var, width=20).grid(row=0, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT端口
ttk.Label(mqtt_frame, text="端口:", font=('Arial', 9)).grid(row=1, column=0, sticky=tk.W, pady=3)
self.mqtt_port_var = tk.StringVar(value="1883")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_port_var, width=20).grid(row=1, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT主题加宽文本框
ttk.Label(mqtt_frame, text="主题:", font=('Arial', 9)).grid(row=2, column=0, sticky=tk.W, pady=3)
self.mqtt_topic_var = tk.StringVar(value="xiaofang/test")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_topic_var, width=20).grid(row=2, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT用户名
ttk.Label(mqtt_frame, text="用户名:", font=('Arial', 9)).grid(row=3, column=0, sticky=tk.W, pady=3)
self.mqtt_username_var = tk.StringVar(value="xiaofang")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_username_var, width=20).grid(row=3, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT密码
ttk.Label(mqtt_frame, text="密码:", font=('Arial', 9)).grid(row=4, column=0, sticky=tk.W, pady=3)
self.mqtt_password_var = tk.StringVar(value="xiaofang@qwer")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_password_var, show="*", width=20).grid(row=4, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 第三列:按钮容器和系统状态
right_column_frame = ttk.Frame(config_frame)
right_column_frame.grid(row=0, column=2, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5))
# 按钮容器(上下顺序排列)
button_container = ttk.LabelFrame(right_column_frame, text="操作控制", padding="8")
button_container.grid(row=0, column=2, sticky=(tk.W, tk.E, tk.N), padx=(5, 0))
# 测试MQTT连接按钮
self.test_mqtt_btn = ttk.Button(button_container, text="测试连接", command=self.start_mqtt_test, width=15)
self.test_mqtt_btn.pack(pady=5)
# 数据采集按钮
self.collect_btn = ttk.Button(button_container, text="开始采集", command=self.toggle_collection, width=15)
self.collect_btn.pack(pady=5)
# 数据上传按钮
self.upload_btn = ttk.Button(button_container, text="开始上传", command=self.toggle_upload, width=15)
self.upload_btn.pack(pady=5)
# 系统状态(上下排列)
status_frame = ttk.LabelFrame(right_column_frame, text="系统状态", padding="8")
status_frame.grid(row=0, column=3, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5))
# 传感器状态(上下排列)
sensor_status_frame = ttk.Frame(status_frame)
sensor_status_frame.pack(fill=tk.X, pady=3)
ttk.Label(sensor_status_frame, text="传感器:", font=('Arial', 9)).pack(side=tk.LEFT)
self.sensor_status = tk.Label(sensor_status_frame, text="", fg="red", font=("Arial", 12, "bold"))
self.sensor_status.pack(side=tk.LEFT, padx=(5, 5))
self.sensor_status_text = ttk.Label(sensor_status_frame, text="未连接", font=('Arial', 9))
self.sensor_status_text.pack(side=tk.LEFT)
# 云服务状态(上下排列)
upload_status_frame = ttk.Frame(status_frame)
upload_status_frame.pack(fill=tk.X, pady=3)
ttk.Label(upload_status_frame, text="云服务:", font=('Arial', 9)).pack(side=tk.LEFT)
self.upload_status = tk.Label(upload_status_frame, text="", fg="red", font=("Arial", 12, "bold"))
self.upload_status.pack(side=tk.LEFT, padx=(5, 5))
self.upload_status_text = ttk.Label(upload_status_frame, text="未连接", font=('Arial', 9))
self.upload_status_text.pack(side=tk.LEFT)
# 网络状态(上下排列)
network_status_frame = ttk.Frame(status_frame)
network_status_frame.pack(fill=tk.X, pady=3)
ttk.Label(network_status_frame, text="网 络:", font=('Arial', 9)).pack(side=tk.LEFT)
self.mqtt_test_status = tk.Label(network_status_frame, text="", fg="gray", font=("Arial", 12, "bold"))
self.mqtt_test_status.pack(side=tk.LEFT, padx=(5, 5))
self.mqtt_test_status_text = ttk.Label(network_status_frame, text="未测试", font=('Arial', 9))
self.mqtt_test_status_text.pack(side=tk.LEFT)
# 浓度显示区域(跨三列,和数据记录一样宽)
display_frame = ttk.LabelFrame(main_frame, text="实时浓度监测", padding="12")
display_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
# 浓度值显示(超大字体,宽度与数据记录一致)
self.concentration_var = tk.StringVar(value="0")
concentration_label = tk.Label(display_frame, textvariable=self.concentration_var,
font=('Arial', 36, 'bold'), fg='#2c3e50', bg='white',
relief='sunken', bd=2, width=25, height=2)
concentration_label.pack(pady=10)
# 单位标签和更新时间
info_frame = ttk.Frame(display_frame)
info_frame.pack()
unit_label = tk.Label(info_frame, text="ppm", font=('Arial', 14), fg='#7f8c8d')
unit_label.pack(side=tk.LEFT, padx=(0, 20))
self.update_time_var = tk.StringVar(value="未更新")
time_label = tk.Label(info_frame, textvariable=self.update_time_var,
font=('Arial', 9), fg='#95a5a6')
time_label.pack(side=tk.LEFT)
# 历史数据区域(跨三列,和浓度显示一样宽)
history_frame = ttk.LabelFrame(main_frame, text="最近数据记录", padding="6")
history_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S))
# 创建历史数据文本框(宽度与浓度显示一致)
self.history_text = tk.Text(history_frame, height=4, width=60, font=('Consolas', 9),
bg='#f8f9fa', relief='sunken', bd=1)
scrollbar = ttk.Scrollbar(history_frame, orient=tk.VERTICAL, command=self.history_text.yview)
self.history_text.configure(yscrollcommand=scrollbar.set)
self.history_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 配置权重
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.columnconfigure(2, weight=1)
main_frame.rowconfigure(2, weight=1) # 浓度显示区域
main_frame.rowconfigure(3, weight=1) # 历史数据区域
config_frame.columnconfigure(0, weight=1)
config_frame.columnconfigure(1, weight=2) # 云服务设置更宽
config_frame.columnconfigure(2, weight=1)
serial_frame.columnconfigure(1, weight=1)
mqtt_frame.columnconfigure(1, weight=1)
right_column_frame.columnconfigure(0, weight=1)
display_frame.columnconfigure(0, weight=1)
history_frame.columnconfigure(0, weight=1)
history_frame.rowconfigure(0, weight=1)
def refresh_serial_ports(self):
"""刷新可用串口列表(只在启动时调用一次)"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
self.port_combo['values'] = port_list
if port_list:
self.port_combo.current(0)
def calculate_crc16(self, data):
"""计算Modbus CRC16校验"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc = crc >> 1
return crc
def start_mqtt_test(self):
"""启动MQTT连接测试"""
if self.mqtt_testing:
return
self.mqtt_testing = True
self.test_mqtt_btn.config(state="disabled", text="测试中...")
self.mqtt_test_status.config(fg="yellow")
self.mqtt_test_status_text.config(text="测试中")
test_thread = threading.Thread(target=self.test_mqtt_connection_thread, daemon=True)
test_thread.start()
def test_mqtt_connection_thread(self):
"""在单独线程中测试MQTT连接"""
try:
client = mqtt.Client()
connected = False
def on_connect(client, userdata, flags, rc):
nonlocal connected
connected = (rc == 0)
client.on_connect = on_connect
username = self.mqtt_username_var.get()
password = self.mqtt_password_var.get()
if username and password:
client.username_pw_set(username, password)
client.connect(self.mqtt_broker_var.get(), int(self.mqtt_port_var.get()), 5)
client.loop_start()
timeout = 5
start_time = time.time()
while not connected and time.time() - start_time < timeout:
time.sleep(0.1)
client.loop_stop()
client.disconnect()
if connected:
self.root.after(0, self.on_mqtt_test_success)
else:
self.root.after(0, lambda: self.on_mqtt_test_failure("连接失败"))
except Exception as e:
self.root.after(0, lambda: self.on_mqtt_test_failure(str(e)))
def on_mqtt_test_success(self):
"""MQTT测试成功回调"""
self.mqtt_testing = False
self.test_mqtt_btn.config(state="normal", text="测试连接")
self.mqtt_test_status.config(fg="green")
self.mqtt_test_status_text.config(text="已连接")
messagebox.showinfo("成功", "MQTT连接测试成功")
def on_mqtt_test_failure(self, error_msg):
"""MQTT测试失败回调"""
self.mqtt_testing = False
self.test_mqtt_btn.config(state="normal", text="测试连接")
self.mqtt_test_status.config(fg="red")
self.mqtt_test_status_text.config(text="连接失败")
messagebox.showerror("错误", f"MQTT连接测试失败: {error_msg}")
def build_modbus_command(self, address):
"""构建Modbus读取命令"""
command = bytearray([
address, 0x04, 0x05, 0x20, 0x00, 0x02
])
crc = self.calculate_crc16(command)
command.extend([crc & 0xFF, (crc >> 8) & 0xFF])
return command
def parse_sensor_data(self, response):
"""解析传感器返回数据"""
try:
if len(response) >= 9:
if response[0] == int(self.address_var.get()) and response[1] == 0x04:
data_bytes = response[3:7]
concentration = struct.unpack('>i', data_bytes)[0]
return concentration
return None
except Exception as e:
logger.error(f"解析传感器数据错误: {e}")
return None
def read_sensor_data(self):
"""读取传感器数据"""
try:
if not self.serial_port or not self.serial_port.is_open:
return None
address = int(self.address_var.get())
command = self.build_modbus_command(address)
self.serial_port.reset_input_buffer()
self.serial_port.write(command)
time.sleep(0.1)
if self.serial_port.in_waiting > 0:
response = self.serial_port.read(self.serial_port.in_waiting)
concentration = self.parse_sensor_data(response)
if concentration is not None:
self.current_concentration = concentration
self.sensor_responded = True
self.sensor_status.config(fg="green")
self.sensor_status_text.config(text="已连接")
# 更新显示
self.update_display(concentration)
# 添加到历史数据
self.add_to_history(concentration)
# 如果上传功能开启,上传数据
if self.is_uploading and self.mqtt_client:
self.upload_to_mqtt(concentration)
return concentration
else:
self.sensor_responded = False
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="无响应")
else:
self.sensor_responded = False
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="无响应")
except Exception as e:
logger.error(f"读取传感器数据错误: {e}")
self.sensor_responded = False
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="错误")
return None
def update_display(self, concentration):
"""更新显示"""
self.concentration_var.set(str(concentration))
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.update_time_var.set(f"最后更新: {current_time}")
def add_to_history(self, concentration):
"""添加到历史记录"""
current_time = datetime.now().strftime("%H:%M:%S")
history_entry = f"{current_time} - {concentration} ppm\n"
# 添加到文本框开头
self.history_text.insert('1.0', history_entry)
# 限制历史记录条数
lines = self.history_text.get('1.0', 'end').count('\n')
if lines > 10:
self.history_text.delete('10.0', 'end')
def upload_to_mqtt(self, concentration):
"""上传数据到MQTT"""
try:
if self.mqtt_client:
result = self.mqtt_client.publish(
self.mqtt_topic_var.get(),
str(concentration)
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
self.upload_success = True
self.upload_status.config(fg="green")
self.upload_status_text.config(text="已连接")
else:
self.upload_success = False
self.upload_status.config(fg="red")
self.upload_status_text.config(text="发送失败")
except Exception as e:
logger.error(f"MQTT上传错误: {e}")
self.upload_success = False
self.upload_status.config(fg="red")
self.upload_status_text.config(text="错误")
def connect_mqtt(self):
"""连接MQTT服务器"""
try:
self.mqtt_client = mqtt.Client()
self.mqtt_client.username_pw_set(
self.mqtt_username_var.get(),
self.mqtt_password_var.get()
)
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("MQTT连接成功")
self.upload_status.config(fg="green")
self.upload_status_text.config(text="已连接")
else:
logger.error(f"MQTT连接失败错误代码: {rc}")
self.upload_status.config(fg="red")
self.upload_status_text.config(text="连接失败")
self.mqtt_client.on_connect = on_connect
self.mqtt_client.connect(
self.mqtt_broker_var.get(),
int(self.mqtt_port_var.get()),
60
)
self.mqtt_client.loop_start()
except Exception as e:
logger.error(f"MQTT连接错误: {e}")
messagebox.showerror("错误", f"MQTT连接失败: {str(e)}")
def disconnect_mqtt(self):
"""断开MQTT连接"""
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.mqtt_client = None
logger.info("MQTT连接已断开")
self.upload_status.config(fg="red")
self.upload_status_text.config(text="未连接")
def toggle_collection(self):
"""切换数据采集状态"""
if not self.is_collecting:
try:
port = self.port_var.get()
if not port:
messagebox.showwarning("警告", "请选择串口号")
return
baudrate = int(self.baudrate_var.get())
self.serial_port = serial.Serial(port, baudrate, timeout=1)
self.is_collecting = True
self.collect_btn.config(text="停止采集")
# 更新状态显示
self.sensor_status.config(fg="green")
self.sensor_status_text.config(text="已连接")
# 启动采集线程
self.collection_thread = threading.Thread(target=self.collection_loop, daemon=True)
self.collection_thread.start()
except Exception as e:
messagebox.showerror("错误", f"打开串口失败: {str(e)}")
self.is_collecting = False
self.collect_btn.config(text="开始采集")
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="错误")
else:
self.is_collecting = False
self.collect_btn.config(text="开始采集")
if self.serial_port:
self.serial_port.close()
self.serial_port = None
# 更新状态显示
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="未连接")
def toggle_upload(self):
"""切换数据上传状态"""
if not self.is_uploading:
try:
self.connect_mqtt()
self.is_uploading = True
self.upload_btn.config(text="停止上传")
except Exception as e:
messagebox.showerror("错误", f"启动上传失败: {str(e)}")
self.is_uploading = False
self.upload_btn.config(text="开始上传")
else:
self.is_uploading = False
self.upload_btn.config(text="开始上传")
self.disconnect_mqtt()
def collection_loop(self):
"""数据采集循环"""
while self.is_collecting:
try:
self.read_sensor_data()
interval = int(self.interval_var.get())
time.sleep(interval)
except Exception as e:
logger.error(f"采集循环错误: {e}")
time.sleep(1)
def on_closing(self):
"""程序关闭时的清理工作"""
self.is_collecting = False
self.is_uploading = False
if self.serial_port:
self.serial_port.close()
if self.mqtt_client:
self.disconnect_mqtt()
self.root.destroy()
def main():
root = tk.Tk()
app = SensorMQTTApp(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()
if __name__ == "__main__":
main()