demo/python/mqtt/mqtt_lark.py

557 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, messagebox, font
import serial
import serial.tools.list_ports
import threading
import time
import struct
import paho.mqtt.client as mqtt
from datetime import datetime
import json
import logging
import binascii
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SensorMQTTApp:
def __init__(self, root):
self.root = root
self.root.title("气体浓度监测系统")
self.root.geometry("800x600")
self.root.configure(bg='#f0f0f0')
# 变量初始化
self.serial_port = None
self.is_collecting = False
self.is_uploading = False
self.sensor_responded = False
self.upload_success = False
self.data_points = []
self.sample_interval = 5
self.current_concentration = 0
# MQTT客户端
self.mqtt_client = None
self.mqtt_testing = False
self.setup_ui()
self.refresh_serial_ports()
def setup_ui(self):
# 设置主题样式
style = ttk.Style()
style.theme_use('clam')
# 主框架
main_frame = ttk.Frame(self.root, padding="8")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 标题
title_label = tk.Label(main_frame, text="气体浓度实时监测系统",
font=('Arial', 16, 'bold'), fg='#2c3e50', bg='#f0f0f0')
title_label.grid(row=0, column=0, columnspan=3, pady=(0, 10))
# 配置区域(三列布局)
config_frame = ttk.Frame(main_frame)
config_frame.grid(row=1, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N), pady=(0, 10))
# 第一列:串口设置
serial_frame = ttk.LabelFrame(config_frame, text="串口设置", padding="8")
serial_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 5))
# 串口号
ttk.Label(serial_frame, text="串口号:", font=('Arial', 9)).grid(row=0, column=0, sticky=tk.W, pady=3)
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(serial_frame, textvariable=self.port_var, width=15, state='readonly')
self.port_combo.grid(row=0, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 波特率
ttk.Label(serial_frame, text="波特率:", font=('Arial', 9)).grid(row=1, column=0, sticky=tk.W, pady=3)
self.baudrate_var = tk.StringVar(value="19200")
baudrate_combo = ttk.Combobox(serial_frame, textvariable=self.baudrate_var,
values=["9600", "19200", "38400", "57600", "115200"],
width=15, state='readonly')
baudrate_combo.grid(row=1, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 设备地址
ttk.Label(serial_frame, text="设备地址:", font=('Arial', 9)).grid(row=2, column=0, sticky=tk.W, pady=3)
self.address_var = tk.StringVar(value="1")
address_spin = ttk.Spinbox(serial_frame, from_=0, to=255, textvariable=self.address_var, width=15)
address_spin.grid(row=2, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 采样间隔
ttk.Label(serial_frame, text="采样间隔(秒):", font=('Arial', 9)).grid(row=3, column=0, sticky=tk.W, pady=3)
self.interval_var = tk.StringVar(value="5")
interval_spin = ttk.Spinbox(serial_frame, from_=1, to=3600, textvariable=self.interval_var, width=15)
interval_spin.grid(row=3, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 第二列:云服务设置(加宽文本框)
mqtt_frame = ttk.LabelFrame(config_frame, text="云服务设置", padding="8")
mqtt_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5))
# MQTT服务器地址
ttk.Label(mqtt_frame, text="服务器地址:", font=('Arial', 9)).grid(row=0, column=0, sticky=tk.W, pady=3)
self.mqtt_broker_var = tk.StringVar(value="122.112.229.121")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_broker_var, width=20).grid(row=0, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT端口
ttk.Label(mqtt_frame, text="端口:", font=('Arial', 9)).grid(row=1, column=0, sticky=tk.W, pady=3)
self.mqtt_port_var = tk.StringVar(value="1883")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_port_var, width=20).grid(row=1, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT主题加宽文本框
ttk.Label(mqtt_frame, text="主题:", font=('Arial', 9)).grid(row=2, column=0, sticky=tk.W, pady=3)
self.mqtt_topic_var = tk.StringVar(value="xiaofang/test")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_topic_var, width=20).grid(row=2, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT用户名
ttk.Label(mqtt_frame, text="用户名:", font=('Arial', 9)).grid(row=3, column=0, sticky=tk.W, pady=3)
self.mqtt_username_var = tk.StringVar(value="xiaofang")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_username_var, width=20).grid(row=3, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# MQTT密码
ttk.Label(mqtt_frame, text="密码:", font=('Arial', 9)).grid(row=4, column=0, sticky=tk.W, pady=3)
self.mqtt_password_var = tk.StringVar(value="xiaofang@qwer")
ttk.Entry(mqtt_frame, textvariable=self.mqtt_password_var, show="*", width=20).grid(row=4, column=1, sticky=tk.W, pady=3, padx=(5, 0))
# 第三列:按钮容器和系统状态
right_column_frame = ttk.Frame(config_frame)
right_column_frame.grid(row=0, column=2, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5))
# 按钮容器(上下顺序排列)
button_container = ttk.LabelFrame(right_column_frame, text="操作控制", padding="8")
button_container.grid(row=0, column=2, sticky=(tk.W, tk.E, tk.N), padx=(5, 0))
# 测试MQTT连接按钮
self.test_mqtt_btn = ttk.Button(button_container, text="测试连接", command=self.start_mqtt_test, width=15)
self.test_mqtt_btn.pack(pady=5)
# 数据采集按钮
self.collect_btn = ttk.Button(button_container, text="开始采集", command=self.toggle_collection, width=15)
self.collect_btn.pack(pady=5)
# 数据上传按钮
self.upload_btn = ttk.Button(button_container, text="开始上传", command=self.toggle_upload, width=15)
self.upload_btn.pack(pady=5)
# 系统状态(上下排列)
status_frame = ttk.LabelFrame(right_column_frame, text="系统状态", padding="8")
status_frame.grid(row=0, column=3, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 5))
# 传感器状态(上下排列)
sensor_status_frame = ttk.Frame(status_frame)
sensor_status_frame.pack(fill=tk.X, pady=3)
ttk.Label(sensor_status_frame, text="传感器:", font=('Arial', 9)).pack(side=tk.LEFT)
self.sensor_status = tk.Label(sensor_status_frame, text="", fg="red", font=("Arial", 12, "bold"))
self.sensor_status.pack(side=tk.LEFT, padx=(5, 5))
self.sensor_status_text = ttk.Label(sensor_status_frame, text="未连接", font=('Arial', 9))
self.sensor_status_text.pack(side=tk.LEFT)
# 云服务状态(上下排列)
upload_status_frame = ttk.Frame(status_frame)
upload_status_frame.pack(fill=tk.X, pady=3)
ttk.Label(upload_status_frame, text="云服务:", font=('Arial', 9)).pack(side=tk.LEFT)
self.upload_status = tk.Label(upload_status_frame, text="", fg="red", font=("Arial", 12, "bold"))
self.upload_status.pack(side=tk.LEFT, padx=(5, 5))
self.upload_status_text = ttk.Label(upload_status_frame, text="未连接", font=('Arial', 9))
self.upload_status_text.pack(side=tk.LEFT)
# 网络状态(上下排列)
network_status_frame = ttk.Frame(status_frame)
network_status_frame.pack(fill=tk.X, pady=3)
ttk.Label(network_status_frame, text="网 络:", font=('Arial', 9)).pack(side=tk.LEFT)
self.mqtt_test_status = tk.Label(network_status_frame, text="", fg="gray", font=("Arial", 12, "bold"))
self.mqtt_test_status.pack(side=tk.LEFT, padx=(5, 5))
self.mqtt_test_status_text = ttk.Label(network_status_frame, text="未测试", font=('Arial', 9))
self.mqtt_test_status_text.pack(side=tk.LEFT)
# 浓度显示区域(跨三列,和数据记录一样宽)
display_frame = ttk.LabelFrame(main_frame, text="实时浓度监测", padding="12")
display_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
# 浓度值显示(超大字体,宽度与数据记录一致)
self.concentration_var = tk.StringVar(value="0")
concentration_label = tk.Label(display_frame, textvariable=self.concentration_var,
font=('Arial', 36, 'bold'), fg='#2c3e50', bg='white',
relief='sunken', bd=2, width=25, height=2)
concentration_label.pack(pady=10)
# 单位标签和更新时间
info_frame = ttk.Frame(display_frame)
info_frame.pack()
unit_label = tk.Label(info_frame, text="ppm", font=('Arial', 14), fg='#7f8c8d')
unit_label.pack(side=tk.LEFT, padx=(0, 20))
self.update_time_var = tk.StringVar(value="未更新")
time_label = tk.Label(info_frame, textvariable=self.update_time_var,
font=('Arial', 9), fg='#95a5a6')
time_label.pack(side=tk.LEFT)
# 历史数据区域(跨三列,和浓度显示一样宽)
history_frame = ttk.LabelFrame(main_frame, text="最近数据记录", padding="6")
history_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S))
# 创建历史数据文本框(宽度与浓度显示一致)
self.history_text = tk.Text(history_frame, height=4, width=60, font=('Consolas', 9),
bg='#f8f9fa', relief='sunken', bd=1)
scrollbar = ttk.Scrollbar(history_frame, orient=tk.VERTICAL, command=self.history_text.yview)
self.history_text.configure(yscrollcommand=scrollbar.set)
self.history_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 配置权重
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.columnconfigure(2, weight=1)
main_frame.rowconfigure(2, weight=1) # 浓度显示区域
main_frame.rowconfigure(3, weight=1) # 历史数据区域
config_frame.columnconfigure(0, weight=1)
config_frame.columnconfigure(1, weight=2) # 云服务设置更宽
config_frame.columnconfigure(2, weight=1)
serial_frame.columnconfigure(1, weight=1)
mqtt_frame.columnconfigure(1, weight=1)
right_column_frame.columnconfigure(0, weight=1)
display_frame.columnconfigure(0, weight=1)
history_frame.columnconfigure(0, weight=1)
history_frame.rowconfigure(0, weight=1)
def refresh_serial_ports(self):
"""刷新可用串口列表(只在启动时调用一次)"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
self.port_combo['values'] = port_list
if port_list:
self.port_combo.current(0)
def calculate_crc16(self, data):
"""计算Modbus CRC16校验"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc = crc >> 1
return crc
def start_mqtt_test(self):
"""启动MQTT连接测试"""
if self.mqtt_testing:
return
self.mqtt_testing = True
self.test_mqtt_btn.config(state="disabled", text="测试中...")
self.mqtt_test_status.config(fg="yellow")
self.mqtt_test_status_text.config(text="测试中")
test_thread = threading.Thread(target=self.test_mqtt_connection_thread, daemon=True)
test_thread.start()
def test_mqtt_connection_thread(self):
"""在单独线程中测试MQTT连接"""
try:
client = mqtt.Client()
connected = False
def on_connect(client, userdata, flags, rc):
nonlocal connected
connected = (rc == 0)
client.on_connect = on_connect
username = self.mqtt_username_var.get()
password = self.mqtt_password_var.get()
if username and password:
client.username_pw_set(username, password)
client.connect(self.mqtt_broker_var.get(), int(self.mqtt_port_var.get()), 5)
client.loop_start()
timeout = 5
start_time = time.time()
while not connected and time.time() - start_time < timeout:
time.sleep(0.1)
client.loop_stop()
client.disconnect()
if connected:
self.root.after(0, self.on_mqtt_test_success)
else:
self.root.after(0, lambda: self.on_mqtt_test_failure("连接失败"))
except Exception as e:
self.root.after(0, lambda: self.on_mqtt_test_failure(str(e)))
def on_mqtt_test_success(self):
"""MQTT测试成功回调"""
self.mqtt_testing = False
self.test_mqtt_btn.config(state="normal", text="测试连接")
self.mqtt_test_status.config(fg="green")
self.mqtt_test_status_text.config(text="已连接")
messagebox.showinfo("成功", "MQTT连接测试成功")
def on_mqtt_test_failure(self, error_msg):
"""MQTT测试失败回调"""
self.mqtt_testing = False
self.test_mqtt_btn.config(state="normal", text="测试连接")
self.mqtt_test_status.config(fg="red")
self.mqtt_test_status_text.config(text="连接失败")
messagebox.showerror("错误", f"MQTT连接测试失败: {error_msg}")
def build_modbus_command(self, address):
"""构建Modbus读取命令"""
command = bytearray([
address, 0x04, 0x05, 0x20, 0x00, 0x02
])
crc = self.calculate_crc16(command)
command.extend([crc & 0xFF, (crc >> 8) & 0xFF])
return command
def parse_sensor_data(self, response):
"""解析传感器返回数据"""
try:
if len(response) >= 9:
if response[0] == int(self.address_var.get()) and response[1] == 0x04:
data_bytes = response[3:7]
concentration = struct.unpack('>i', data_bytes)[0]
return concentration
return None
except Exception as e:
logger.error(f"解析传感器数据错误: {e}")
return None
def read_sensor_data(self):
"""读取传感器数据"""
try:
if not self.serial_port or not self.serial_port.is_open:
return None
address = int(self.address_var.get())
command = self.build_modbus_command(address)
self.serial_port.reset_input_buffer()
self.serial_port.write(command)
time.sleep(0.1)
if self.serial_port.in_waiting > 0:
response = self.serial_port.read(self.serial_port.in_waiting)
concentration = self.parse_sensor_data(response)
if concentration is not None:
self.current_concentration = concentration
self.sensor_responded = True
self.sensor_status.config(fg="green")
self.sensor_status_text.config(text="已连接")
# 更新显示
self.update_display(concentration)
# 添加到历史数据
self.add_to_history(concentration)
# 如果上传功能开启,上传数据
if self.is_uploading and self.mqtt_client:
self.upload_to_mqtt(concentration)
return concentration
else:
self.sensor_responded = False
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="无响应")
else:
self.sensor_responded = False
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="无响应")
except Exception as e:
logger.error(f"读取传感器数据错误: {e}")
self.sensor_responded = False
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="错误")
return None
def update_display(self, concentration):
"""更新显示"""
self.concentration_var.set(str(concentration))
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.update_time_var.set(f"最后更新: {current_time}")
def add_to_history(self, concentration):
"""添加到历史记录"""
current_time = datetime.now().strftime("%H:%M:%S")
history_entry = f"{current_time} - {concentration} ppm\n"
# 添加到文本框开头
self.history_text.insert('1.0', history_entry)
# 限制历史记录条数
lines = self.history_text.get('1.0', 'end').count('\n')
if lines > 10:
self.history_text.delete('10.0', 'end')
def upload_to_mqtt(self, concentration):
"""上传数据到MQTT"""
try:
if self.mqtt_client:
result = self.mqtt_client.publish(
self.mqtt_topic_var.get(),
str(concentration)
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
self.upload_success = True
self.upload_status.config(fg="green")
self.upload_status_text.config(text="已连接")
else:
self.upload_success = False
self.upload_status.config(fg="red")
self.upload_status_text.config(text="发送失败")
except Exception as e:
logger.error(f"MQTT上传错误: {e}")
self.upload_success = False
self.upload_status.config(fg="red")
self.upload_status_text.config(text="错误")
def connect_mqtt(self):
"""连接MQTT服务器"""
try:
self.mqtt_client = mqtt.Client()
self.mqtt_client.username_pw_set(
self.mqtt_username_var.get(),
self.mqtt_password_var.get()
)
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("MQTT连接成功")
self.upload_status.config(fg="green")
self.upload_status_text.config(text="已连接")
else:
logger.error(f"MQTT连接失败错误代码: {rc}")
self.upload_status.config(fg="red")
self.upload_status_text.config(text="连接失败")
self.mqtt_client.on_connect = on_connect
self.mqtt_client.connect(
self.mqtt_broker_var.get(),
int(self.mqtt_port_var.get()),
60
)
self.mqtt_client.loop_start()
except Exception as e:
logger.error(f"MQTT连接错误: {e}")
messagebox.showerror("错误", f"MQTT连接失败: {str(e)}")
def disconnect_mqtt(self):
"""断开MQTT连接"""
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.mqtt_client = None
logger.info("MQTT连接已断开")
self.upload_status.config(fg="red")
self.upload_status_text.config(text="未连接")
def toggle_collection(self):
"""切换数据采集状态"""
if not self.is_collecting:
try:
port = self.port_var.get()
if not port:
messagebox.showwarning("警告", "请选择串口号")
return
baudrate = int(self.baudrate_var.get())
self.serial_port = serial.Serial(port, baudrate, timeout=1)
self.is_collecting = True
self.collect_btn.config(text="停止采集")
# 更新状态显示
self.sensor_status.config(fg="green")
self.sensor_status_text.config(text="已连接")
# 启动采集线程
self.collection_thread = threading.Thread(target=self.collection_loop, daemon=True)
self.collection_thread.start()
except Exception as e:
messagebox.showerror("错误", f"打开串口失败: {str(e)}")
self.is_collecting = False
self.collect_btn.config(text="开始采集")
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="错误")
else:
self.is_collecting = False
self.collect_btn.config(text="开始采集")
if self.serial_port:
self.serial_port.close()
self.serial_port = None
# 更新状态显示
self.sensor_status.config(fg="red")
self.sensor_status_text.config(text="未连接")
def toggle_upload(self):
"""切换数据上传状态"""
if not self.is_uploading:
try:
self.connect_mqtt()
self.is_uploading = True
self.upload_btn.config(text="停止上传")
except Exception as e:
messagebox.showerror("错误", f"启动上传失败: {str(e)}")
self.is_uploading = False
self.upload_btn.config(text="开始上传")
else:
self.is_uploading = False
self.upload_btn.config(text="开始上传")
self.disconnect_mqtt()
def collection_loop(self):
"""数据采集循环"""
while self.is_collecting:
try:
self.read_sensor_data()
interval = int(self.interval_var.get())
time.sleep(interval)
except Exception as e:
logger.error(f"采集循环错误: {e}")
time.sleep(1)
def on_closing(self):
"""程序关闭时的清理工作"""
self.is_collecting = False
self.is_uploading = False
if self.serial_port:
self.serial_port.close()
if self.mqtt_client:
self.disconnect_mqtt()
self.root.destroy()
def main():
root = tk.Tk()
app = SensorMQTTApp(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()
if __name__ == "__main__":
main()