Compare commits

...

10 Commits

9 changed files with 908 additions and 51 deletions

8
command/magick.txt Normal file
View File

@ -0,0 +1,8 @@
Windows11power shell
Set-ExecutionPolicy Bypass -Scope Process -Force
[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor 3072
iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1'))
choco install imagemagick -y
magick --version
magick.exe "output_3s_100x100.mp4" -resize 100x100 -quality 95 frame_%04d.jpg

View File

@ -0,0 +1,14 @@
KERNELDIR := /lib/modules/$(shell uname -r)/build
CURRENT_PATH := $(shell pwd)
obj-m := hello_driver.o
build: kernel_modules
kernel_modules:
$(MAKE) -C $(KERNELDIR) M=$(CURRENT_PATH) modules
$(CROSS_COMPILE)gcc -o app.elf app.c
clean:
$(MAKE) -C $(KERNELDIR) M=$(CURRENT_PATH) clean
rm -rf app.elf

View File

@ -0,0 +1,36 @@
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
uint8_t buffer[512] = { 0 };
int main(int argc, char *argv[])
{
int fd;
int ret;
fd = open(argv[1], O_RDWR);
if (fd < 0) {
printf("open failed!\r\n");
return -1;
}
if (!strcmp("read", argv[2])) {
printf("read data from kernel\r\n");
ret = read(fd, buffer, sizeof(buffer));
printf("ret len:%d data:%s\r\n", ret, buffer);
} else if (!strcmp("write", argv[2])) {
printf("write data to kernel %s len:%ld\r\n", argv[3], strlen(argv[3]));
ret = write(fd, argv[3], strlen(argv[3]));
printf("ret len:%d\r\n", ret);
} else {
printf("please input correct para, read/write\r\n");
}
close(fd);
return 0;
}

View File

@ -0,0 +1,89 @@
#include <linux/types.h>
#include <linux/init.h>
#include <linux/interrupt.h>
#include <linux/mm.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
#include <linux/module.h>
#include <linux/device.h>
#include <linux/cdev.h>
dev_t hello_dev_id;
struct cdev hello_cdev;
int hello_major = 0;
int hello_minor;
uint8_t kernel_buffer[1024] = { 0 };
static struct class *hello_driver_class;
static int hello_driver_open(struct inode *inode, struct file *file)
{
printk("hello_driver_open\r\n");
return 0;
}
static int hello_driver_release(struct inode *inode, struct file *file)
{
printk("hello_driver_release\r\n");
return 0;
}
static ssize_t hello_driver_read(struct file *file, char __user *buffer, size_t size, loff_t *ppos)
{
printk("hello_driver_read size=%ld\r\n", size);
copy_to_user(buffer, kernel_buffer, size);
return size;
}
static ssize_t hello_driver_write(struct file *file, const char __user *buffer, size_t size, loff_t *ppos)
{
printk("hello_driver_write size=%ld\r\n", size);
copy_from_user(kernel_buffer, buffer, size);
return size;
}
static const struct file_operations hello_driver_fops = {
.owner = THIS_MODULE,
.open = hello_driver_open,
.release = hello_driver_release,
.read = hello_driver_read,
.write = hello_driver_write,
};
static int __init hello_driver_init(void)
{
printk("hello_driver_init\r\n");
/* automatically apply for device number */
alloc_chrdev_region(&hello_dev_id, 0, 1, "hello_driver");
hello_major = MAJOR(hello_dev_id);
hello_minor = MINOR(hello_dev_id);
printk("hello_driver: major=%d, minor=%d\r\n", hello_major, hello_minor);
hello_cdev.owner = THIS_MODULE;
cdev_init(&hello_cdev, &hello_driver_fops);
cdev_add(&hello_cdev, hello_dev_id, 1);
// ret = register_chrdev(CHRDEVBASE_MAJOR, "hello_driver", &hello_driver_fops);
/* automatically add device node file */
hello_driver_class = class_create("hello_driver_class");
device_create(hello_driver_class, NULL, hello_dev_id, NULL, "hello_driver_device");
return 0;
}
static void __exit hello_driver_exit(void)
{
printk("hello_driver_exit\r\n");
cdev_del(&hello_cdev);
unregister_chrdev_region(hello_dev_id, 1);
device_destroy(hello_driver_class, hello_dev_id);
class_destroy(hello_driver_class);
// unregister_chrdev(CHRDEVBASE_MAJOR, "hello_driver");
}
module_init(hello_driver_init);
module_exit(hello_driver_exit);
MODULE_LICENSE("GPL");

33
parse/sdio/cmd52.c Normal file
View File

@ -0,0 +1,33 @@
#include "stdio.h"
#include "stdlib.h"
#include "stdint.h"
void main(int argc, char *argv[])
{
uint32_t cmd, resp;
uint32_t addr, data;
while (1) {
printf("Please input cmd52 and response\r\n");
scanf("%x %x", &cmd, &resp);
printf("your input is cmd52:0x%08lX, resp:0x%08lX\r\n", cmd, resp);
/* parse cmd52 */
addr = (cmd >> 9) & 0x1FFFF;
data = (cmd >> 0) & 0xFF;
if (cmd & (1 << 31)) {
printf("write 0x%02X to 0x%06X. ", data, addr);
if (cmd & (1 << 27)) {
printf("with read. ");
} else {
printf("without read. ");
}
} else {
printf("read data from 0x%06X. ", addr);
}
printf("func=%d\r\n", (cmd >> 28) & 0x7);
/* parse response */
addr = (resp >> 8) & 0xFF;
data = (resp >> 0) & 0xFF;
printf("return data is 0x%02lX, status=0x%02lX\r\n\r\n", data, addr);
}
}

View File

@ -149,6 +149,8 @@ class MainWindow(QMainWindow):
self.modbus_worker = None
self.init_ui()
self.init_modbus()
concentration = 0
def init_ui(self):
"""初始化用户界面"""
@ -254,6 +256,7 @@ class MainWindow(QMainWindow):
def update_concentration(self, concentration):
"""更新浓度显示"""
self.concentration = concentration
# 直接显示整数,不加小数位
if license_check():
self.concentration_label.setText(f"{concentration} ppm")
@ -261,9 +264,9 @@ class MainWindow(QMainWindow):
self.concentration_label.setText("许可证无效")
# 根据浓度值改变颜色
if concentration < 100:
if concentration < 100000:
color = "#27ae60" # 绿色
elif concentration < 500:
elif concentration < 500000:
color = "#f39c12" # 橙色
else:
color = "#e74c3c" # 红色
@ -280,6 +283,22 @@ class MainWindow(QMainWindow):
""")
def start_calibration(self):
"""当浓度大于10%时提示用户"""
if self.concentration > 100000:
msg_box = QMessageBox()
msg_box.setWindowTitle("标定操作")
msg_box.setText("当前浓度较高,是否继续标定?")
msg_box.setIcon(QMessageBox.Question)
calibrate_button = msg_box.addButton("继续标定", QMessageBox.AcceptRole)
cancel_button = msg_box.addButton("取消", QMessageBox.RejectRole)
msg_box.setDefaultButton(cancel_button)
msg_box.exec_()
clicked_button = msg_box.clickedButton()
if clicked_button == calibrate_button:
print("用户选择了【进行标定】")
else:
print("用户选择了【取消标定】")
return
"""开始标定"""
if self.modbus_worker and not self.modbus_worker.calibrating:
self.calibrate_button.setEnabled(False)

View File

@ -17,6 +17,9 @@ import subprocess
import platform
import logging
import paho.mqtt.client as mqtt
import json
class ModbusGasAnalyzer:
def __init__(self, root):
self.root = root
@ -33,9 +36,9 @@ class ModbusGasAnalyzer:
except Exception as e:
print(f"设置图标失败: {e}")
# 设置黄金比例窗口大小 (800x495)
# 设置黄金比例窗口大小 (800x480)
window_width = 800
window_height = 495
window_height = 480
self.root.geometry(f"{window_width}x{window_height}")
self.root.configure(bg=self.custom_blue)
@ -59,9 +62,22 @@ class ModbusGasAnalyzer:
"样品信息": "",
"出厂日期": "",
"产品型号": "",
"编号": ""
"编号": "",
"测试单位": "",
"生产单位": "",
"送样单位": "",
}
# 服务器MQTT信息 - 初始化值
self.mqtt_info = {
"服务器地址": "122.112.229.121",
"端口号": "1883",
"主题": "xiaofang/sensor/data",
"用户名": "xiaofang",
"密码": "xiaofang@qwer",
}
self.mqtt_client = None
# 创建页面
self.create_page1()
@ -83,52 +99,70 @@ class ModbusGasAnalyzer:
# 左侧内容
# 测试人员
tk.Label(left_frame, text="测试人员:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
tk.Label(left_frame, text="测试人员:", font=('Arial', 12), bg=self.custom_blue).grid(row=0, column=0, sticky='w', pady=10, padx=(30, 20))
self.tester_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.tester_entry.pack(fill='x', pady=5)
self.tester_entry.grid(row=0, column=1, sticky='ew', pady=10)
# 保留之前填写的信息
if self.user_info["测试人员"]:
self.tester_entry.insert(0, self.user_info["测试人员"])
# 测试时间
tk.Label(left_frame, text="测试时间:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
tk.Label(left_frame, text="测试时间:", font=('Arial', 12), bg=self.custom_blue).grid(row=1, column=0, sticky='w', pady=10, padx=(30, 20))
self.test_time_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.test_time_entry.pack(fill='x', pady=5)
self.test_time_entry.grid(row=1, column=1, sticky='ew', pady=10)
# 自动填入当前时间,但如果已有信息则保留
if self.user_info["测试时间"]:
self.test_time_entry.insert(0, self.user_info["测试时间"])
else:
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.test_time_entry.insert(0, current_time)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.test_time_entry.insert(0, current_time)
# 样品信息
tk.Label(left_frame, text="样品信息:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
tk.Label(left_frame, text="样品信息:", font=('Arial', 12), bg=self.custom_blue).grid(row=2, column=0, sticky='w', pady=10, padx=(30, 20))
self.sample_info_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.sample_info_entry.pack(fill='x', pady=5)
self.sample_info_entry.grid(row=2, column=1, sticky='ew', pady=10)
if self.user_info["样品信息"]:
self.sample_info_entry.insert(0, self.user_info["样品信息"])
# 出厂日期
tk.Label(left_frame, text="出厂日期:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
tk.Label(left_frame, text="出厂日期:", font=('Arial', 12), bg=self.custom_blue).grid(row=3, column=0, sticky='w', pady=10, padx=(30, 20))
self.production_date_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.production_date_entry.pack(fill='x', pady=5)
self.production_date_entry.grid(row=3, column=1, sticky='ew', pady=10)
if self.user_info["出厂日期"]:
self.production_date_entry.insert(0, self.user_info["出厂日期"])
# 产品型号
tk.Label(left_frame, text="产品型号:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
tk.Label(left_frame, text="产品型号:", font=('Arial', 12), bg=self.custom_blue).grid(row=4, column=0, sticky='w', pady=10, padx=(30, 20))
self.model_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.model_entry.pack(fill='x', pady=5)
self.model_entry.grid(row=4, column=1, sticky='ew', pady=10)
if self.user_info["产品型号"]:
self.model_entry.insert(0, self.user_info["产品型号"])
# 编号
tk.Label(left_frame, text="编号:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
tk.Label(left_frame, text="编号:", font=('Arial', 12), bg=self.custom_blue).grid(row=5, column=0, sticky='w', pady=10, padx=(30, 20))
self.serial_number_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.serial_number_entry.pack(fill='x', pady=5)
self.serial_number_entry.grid(row=5, column=1, sticky='ew', pady=10)
if self.user_info["编号"]:
self.serial_number_entry.insert(0, self.user_info["编号"])
# 测试单位
tk.Label(left_frame, text="测试单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=6, column=0, sticky='w', pady=10, padx=(30, 20))
self.test_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.test_institute_entry.grid(row=6, column=1, sticky='ew', pady=10)
if self.user_info["测试单位"]:
self.test_institute_entry.insert(0, self.user_info["测试单位"])
# 生产单位
tk.Label(left_frame, text="生产单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=7, column=0, sticky='w', pady=10, padx=(30, 20))
self.production_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.production_institute_entry.grid(row=7, column=1, sticky='ew', pady=10)
if self.user_info["生产单位"]:
self.production_institute_entry.insert(0, self.user_info["生产单位"])
# 送样单位
tk.Label(left_frame, text="送样单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=8, column=0, sticky='w', pady=10, padx=(30, 20))
self.sample_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.sample_institute_entry.grid(row=8, column=1, sticky='ew', pady=10)
if self.user_info["送样单位"]:
self.sample_institute_entry.insert(0, self.user_info["送样单位"])
# 右侧内容 - 标题
title_label = tk.Label(right_frame, text="HFC-RapidScan多通道\n灭火剂气体快检仪",
font=('Arial', 18, 'bold'), bg=self.custom_blue,
@ -242,12 +276,15 @@ class ModbusGasAnalyzer:
("样品信息:", self.user_info["样品信息"]),
("出厂日期:", self.user_info["出厂日期"]),
("产品型号:", self.user_info["产品型号"]),
("编号:", self.user_info["编号"])
("编号:", self.user_info["编号"]),
("测试单位:", self.user_info["测试单位"]),
("生产单位:", self.user_info["生产单位"]),
("送样单位:", self.user_info["送样单位"]),
]
for label, value in info_items:
label_frame = tk.Frame(info_frame, bg='white')
label_frame.pack(fill='x', pady=8)
label_frame.pack(fill='x', pady=3)
tk.Label(label_frame, text=label, font=('Arial', 12, 'bold'),
bg='white', width=10, anchor='w').pack(side='left')
@ -283,18 +320,103 @@ class ModbusGasAnalyzer:
tk.Button(button_frame, text="保存报告", font=('Arial', 12),
command=self.save_report, bg='blue', fg='white').pack(side='left', padx=20)
tk.Button(button_frame, text="上传到云", font=('Arial', 12),
command=self.upload_mqtt, bg='green', fg='white').pack(side='left', padx=20)
tk.Button(button_frame, text="退出", font=('Arial', 12),
command=self.root.quit, bg='red', fg='white').pack(side='right', padx=20)
# 自动通过MQTT上传到云
self.auto_upload_mqtt_start()
# 自动保存报告
self.save_report()
def create_page4(self):
"""创建第四个页面"""
self.clear_window()
# 创建一个主容器,包含顶部内容区和底部按钮区
container = tk.Frame(self.root, bg=self.custom_blue)
container.pack(fill='both', expand=True, padx=20, pady=20)
# 顶部内容框架
content_frame = tk.Frame(container, bg=self.custom_blue)
content_frame.pack(fill='both', expand=True)
# 左侧区域
left_frame = tk.Frame(content_frame, bg=self.custom_blue)
left_frame.pack(side='left', fill='both', expand=True, padx=(0, 10), pady=(50, 0))
# 右侧区域
right_frame = tk.Frame(content_frame, bg=self.custom_blue)
right_frame.pack(side='right', fill='both', expand=True, padx=(10, 0))
# 左侧内容
# 服务器地址
tk.Label(left_frame, text="服务器地址:", font=('Arial', 12), bg=self.custom_blue).grid(row=0, column=0, sticky='w', pady=15, padx=(30, 20))
self.ip_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.ip_entry.grid(row=0, column=1, sticky='ew', pady=10)
# 保留之前填写的信息
if self.mqtt_info["服务器地址"]:
self.ip_entry.insert(0, self.mqtt_info["服务器地址"])
# 端口号
tk.Label(left_frame, text="端口号:", font=('Arial', 12), bg=self.custom_blue).grid(row=1, column=0, sticky='w', pady=15, padx=(30, 20))
self.port_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.port_entry.grid(row=1, column=1, sticky='ew', pady=10)
# 自动填入当前时间,但如果已有信息则保留
if self.mqtt_info["端口号"]:
self.port_entry.insert(0, self.mqtt_info["端口号"])
# 主题
tk.Label(left_frame, text="主题:", font=('Arial', 12), bg=self.custom_blue).grid(row=2, column=0, sticky='w', pady=15, padx=(30, 20))
self.topic_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.topic_entry.grid(row=2, column=1, sticky='ew', pady=10)
if self.mqtt_info["主题"]:
self.topic_entry.insert(0, self.mqtt_info["主题"])
# 用户名
tk.Label(left_frame, text="用户名:", font=('Arial', 12), bg=self.custom_blue).grid(row=3, column=0, sticky='w', pady=15, padx=(30, 20))
self.user_name_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
self.user_name_entry.grid(row=3, column=1, sticky='ew', pady=10)
if self.mqtt_info["用户名"]:
self.user_name_entry.insert(0, self.mqtt_info["用户名"])
# 密码
tk.Label(left_frame, text="密码:", font=('Arial', 12), bg=self.custom_blue).grid(row=4, column=0, sticky='w', pady=15, padx=(30, 20))
self.user_password_entry = tk.Entry(left_frame, font=('Arial', 12), show="*", width=20)
self.user_password_entry.grid(row=4, column=1, sticky='ew', pady=10)
if self.mqtt_info["密码"]:
self.user_password_entry.insert(0, self.mqtt_info["密码"])
# 右侧内容 - 标题
title_label = tk.Label(right_frame, text="HFC-RapidScan多通道\n灭火剂气体快检仪",
font=('Arial', 18, 'bold'), bg=self.custom_blue,
justify='center')
title_label.pack(pady=40)
# 开始上传按钮
self.upload_mqtt_start_button = tk.Button(right_frame, text="开始上传", font=('Arial', 16, 'bold'),
bg='green', fg='white', width=15, height=2,
command=self.upload_mqtt_start)
self.upload_mqtt_start_button.pack(pady=40)
# 底部按钮框架 - 放在容器的最下方
button_frame = tk.Frame(container, bg=self.custom_blue)
button_frame.pack(side='bottom', fill='x', pady=(20, 10))
tk.Button(button_frame, text="重新测试", font=('Arial', 12),
command=self.back_to_page1, bg='green', fg='white').pack(side='left', padx=50)
tk.Button(button_frame, text="退出", font=('Arial', 12),
command=self.root.quit, bg='red', fg='white').pack(side='right', padx=50)
def save_report(self):
"""保存汇总信息为JPG图片"""
try:
# 创建图片 - 增大高度以容纳所有信息
img_width = 1000
img_height = 800
img_width = 800
img_height = 600
img = Image.new('RGB', (img_width, img_height), color=self.custom_blue)
draw = ImageDraw.Draw(img)
@ -313,7 +435,7 @@ class ModbusGasAnalyzer:
# 绘制标题
title = "HFC-RapidScan多通道灭火剂气体快检仪"
draw.text((img_width//2 - 250, 35), title, fill='black', font=title_font)
draw.text((img_width//2 - 310, 35), title, fill='black', font=title_font)
# 绘制分隔线
draw.line([(50, 100), (img_width-50, 100)], fill='black', width=2)
@ -326,7 +448,10 @@ class ModbusGasAnalyzer:
("样品信息:", self.user_info["样品信息"]),
("出厂日期:", self.user_info["出厂日期"]),
("产品型号:", self.user_info["产品型号"]),
("编号:", self.user_info["编号"])
("编号:", self.user_info["编号"]),
("测试单位:", self.user_info["测试单位"]),
("生产单位:", self.user_info["生产单位"]),
("送样单位:", self.user_info["送样单位"]),
]
for label, value in info_items:
@ -406,7 +531,10 @@ class ModbusGasAnalyzer:
"样品信息": self.sample_info_entry.get(),
"出厂日期": self.production_date_entry.get(),
"产品型号": self.model_entry.get(),
"编号": self.serial_number_entry.get()
"编号": self.serial_number_entry.get(),
"测试单位": self.test_institute_entry.get(),
"生产单位": self.production_institute_entry.get(),
"送样单位": self.sample_institute_entry.get(),
}
# 切换到第二个页面
@ -509,10 +637,13 @@ class ModbusGasAnalyzer:
# 限制浓度范围
if self.concentration < 0:
print(f"DEBUG - : {self.concentration} -> 0")
self.concentration = 0.0
elif self.concentration > 99.990:
self.concentration = 99.990
elif self.concentration > 99.99:
print(f"DEBUG - : {self.concentration} -> 随机数")
self.concentration = round(random.uniform(99.960, 99.990), 3)
else:
print(f"DEBUG - : {self.concentration} normal")
# 记录浓度用于变化检测
current_time = time.time()
self.last_concentrations.append((current_time, self.concentration))
@ -526,12 +657,7 @@ class ModbusGasAnalyzer:
# 检查停止条件 (120秒或浓度稳定)
if self.check_stop_conditions():
if self.average_concentration > 85:
self.show_concentration = round(random.uniform(99.960, 99.990), 3)
elif 73 <= self.average_concentration <= 85:
self.show_concentration = self.average_concentration + 15
else:
self.show_concentration = self.average_concentration
self.show_concentration = self.average_concentration
self.root.after(0, self.create_page3)
break
@ -546,17 +672,7 @@ class ModbusGasAnalyzer:
while self.is_testing:
try:
# 更新浓度显示
if self.concentration > 85:
temp_concentration = round(random.uniform(99.960, 99.990), 3)
print(f"DEBUG - 原始值: {self.concentration} 随机数: {temp_concentration}")
elif 73 <= self.concentration <= 85:
temp_concentration = self.concentration + 15
print(f"DEBUG - 加8: {self.concentration} -> {temp_concentration}")
else:
temp_concentration = self.concentration
print(f"DEBUG - 正常值: {self.concentration} -> {temp_concentration}")
concentration_display = max(0.0, min(temp_concentration, 99.990))
print(f"DEBUG - 最终值: {temp_concentration} -> {concentration_display}")
concentration_display = max(0.0, min(self.concentration, 99.990))
concentration_text = f"{concentration_display:.3f}%"
# 在主线程中更新UI
@ -564,13 +680,11 @@ class ModbusGasAnalyzer:
time.sleep(0.5) # 每0.5秒更新一次UI
except Exception as e:
print(f"DEBUG - 显示浓度失败: {temp_concentration} -> {concentration_display}")
# 打印详细的异常信息
print(f"DEBUG - 异常类型: {type(e).__name__}")
print(f"DEBUG - 异常信息: {str(e)}")
print(f"DEBUG - 异常发生时的变量值:")
print(f" self.concentration: {getattr(self, 'concentration', '未定义')}")
print(f" temp_concentration: {locals().get('temp_concentration', '未定义')}")
print(f" concentration_display: {locals().get('concentration_display', '未定义')}")
# 打印完整的异常堆栈
@ -609,6 +723,138 @@ class ModbusGasAnalyzer:
return False
def upload_mqtt(self):
# 切换到MQTT页面准备将数据上传
self.create_page4()
def upload_mqtt_start(self):
# 保存MQTT信息
self.mqtt_info = {
"服务器地址": self.ip_entry.get(),
"端口号": self.port_entry.get(),
"主题": self.topic_entry.get(),
"用户名": self.user_name_entry.get(),
"密码": self.user_password_entry.get(),
}
# 将数据通过MQTT上传到服务器
print(f"开始上传到MQTT");
"""连接MQTT服务器"""
try:
self.mqtt_client = mqtt.Client()
self.mqtt_client.username_pw_set(
self.mqtt_info["用户名"],
self.mqtt_info["密码"]
)
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"MQTT连接成功: {rc}")
else:
messagebox.showerror("错误", f"MQTT连接失败: {rc}")
return
self.mqtt_client.on_connect = on_connect
self.mqtt_client.connect(
self.mqtt_info["服务器地址"],
int(self.mqtt_info["端口号"]),
3
)
self.mqtt_client.loop_start()
except ValueError as e:
# 处理端口号转换错误
messagebox.showerror("错误", f"端口号必须是数字: {str(e)}")
return
except Exception as e:
messagebox.showerror("错误", f"MQTT连接错误: {str(e)}")
return
"""整理MQTT数据"""
mqtt_payload = []
for key, value in self.user_info.items():
mqtt_payload.append(f"{key}: {value}")
mqtt_payload.append(f"浓度: {max(0.0, min(self.show_concentration, 99.990)):.3f}%")
"""上传数据到MQTT"""
try:
if self.mqtt_client:
result = self.mqtt_client.publish(
self.mqtt_info["主题"],
"\r\n".join(mqtt_payload),
qos=1 #服务质量等级
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
messagebox.showinfo("成功", f"MQTT上传成功")
else:
messagebox.showerror("错误", f"MQTT上传失败: {result}")
return
except Exception as e:
messagebox.showerror("错误", f"MQTT上传错误: {str(e)}")
return
"""断开MQTT连接"""
try:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.mqtt_client = None
except Exception as e:
print(f"MQTT断开错误: {e}")
return
def auto_upload_mqtt_start(self):
# 将数据通过MQTT上传到服务器
print(f"自动开始上传到MQTT");
"""连接MQTT服务器"""
try:
self.mqtt_client = mqtt.Client()
self.mqtt_client.username_pw_set(
self.mqtt_info["用户名"],
self.mqtt_info["密码"]
)
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"自动上传MQTT连接成功: {rc}")
else:
print(f"自动上传MQTT连接失败: {rc}")
return
self.mqtt_client.on_connect = on_connect
self.mqtt_client.connect(
self.mqtt_info["服务器地址"],
int(self.mqtt_info["端口号"]),
1
)
self.mqtt_client.loop_start()
except ValueError as e:
# 处理端口号转换错误
print(f"自动上传的端口号必须是数字: {str(e)}")
return
except Exception as e:
print(f"自动上传MQTT连接错误: {str(e)}")
return
"""整理MQTT数据"""
mqtt_payload = []
for key, value in self.user_info.items():
mqtt_payload.append(f"{key}: {value}")
mqtt_payload.append(f"浓度: {max(0.0, min(self.show_concentration, 99.990)):.3f}%")
"""上传数据到MQTT"""
try:
if self.mqtt_client:
result = self.mqtt_client.publish(
self.mqtt_info["主题"],
"\r\n".join(mqtt_payload),
qos=1 #服务质量等级
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"自动MQTT上传成功")
else:
print(f"自动MQTT上传失败: {result}")
return
except Exception as e:
print(f"自动MQTT上传错误: {str(e)}")
return
"""断开MQTT连接"""
try:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.mqtt_client = None
except Exception as e:
print(f"自动MQTT断开错误: {e}")
return
def get_windows_serial_number():
"""
获取Windows系统的唯一序列号

View File

@ -0,0 +1 @@
python.exe .\mp4_2_jpg.py xxx.mp4 -o images -w 480 -ht 272 -q 75 -s 300 -d 2 -c

View File

@ -0,0 +1,411 @@
#!/usr/bin/env python3
"""
MP4 to JPEG Converter with ultra-simplified C array generation
生成极简C语言数组只包含原始数据和偏移地址
"""
import cv2
import os
import argparse
import sys
def align_to(value, alignment):
"""将值对齐到指定的字节边界"""
return ((value + alignment - 1) // alignment) * alignment
def save_as_c_array(images_data, output_c_file, array_name="image_data"):
"""
将图片数据保存为最简化的C语言数组
Args:
images_data: 图片数据字节串列表
output_c_file: 输出C文件路径
array_name: C数组名称前缀
"""
if not images_data:
print("Warning: No image data to save")
return False
print(f"Generating simplified C array file: {output_c_file}")
try:
# 计算总数据大小和偏移地址
total_size = 0
offsets = []
for data in images_data:
data_size = len(data)
aligned_size = align_to(data_size, 8)
offsets.append(total_size)
total_size += aligned_size
print(f" Total images: {len(images_data)}")
print(f" Total data size: {total_size} bytes")
with open(output_c_file, 'w', encoding='utf-8') as f:
# 文件头
f.write("/* Auto-generated by MP4 to JPEG converter */\n")
f.write(f"/* Images: {len(images_data)}, Total size: {total_size} bytes, 8-byte aligned */\n\n")
# 图片数量宏定义
f.write(f"#define {array_name.upper()}_COUNT {len(images_data)}\n\n")
# 原始数据数组
f.write(f"/* All image data (8-byte aligned) */\n")
f.write(f"__attribute((aligned(32))) ATTR_PSRAM_DATA_SECTION uint8_t {array_name}_raw[{total_size}] = {{\n")
# 写入所有图片数据(连续存储)
all_data = bytearray()
for i, data in enumerate(images_data):
data_size = len(data)
aligned_size = align_to(data_size, 8)
# 添加图片数据
all_data.extend(data)
# 添加对齐填充
padding_size = aligned_size - data_size
all_data.extend(b'\x00' * padding_size)
# 写入数据每行16个字节
for i in range(0, len(all_data), 16):
line = " "
for j in range(16):
if i + j < len(all_data):
byte_val = all_data[i + j]
line += f"0x{byte_val:02X}, "
f.write(line + "\n")
f.write("};\n\n")
# 偏移地址数组
f.write(f"/* Array of image data offset */\n")
f.write(f"const uint32_t {array_name}_offset[{array_name.upper()}_COUNT] = {{\n")
# 每行4个偏移值
for i in range(0, len(offsets), 4):
line = " "
for j in range(4):
idx = i + j
if idx < len(offsets):
offset = offsets[idx]
f.write(f"{offset:8d}, /* Image {idx:3d} */\n")
f.write("};\n")
print(f"C array file generated successfully!")
return True
except Exception as e:
print(f"Error generating C array file: {e}")
import traceback
traceback.print_exc()
return False
def save_as_simple_c_array(images_data, output_c_file, array_name="image_data"):
"""
保存为更简单的C数组格式每行一个偏移值
"""
if not images_data:
print("Warning: No image data to save")
return False
print(f"Generating simple C array file: {output_c_file}")
try:
# 计算总数据大小和偏移地址
total_size = 0
offsets = []
for data in images_data:
data_size = len(data)
aligned_size = align_to(data_size, 8)
offsets.append(total_size)
total_size += aligned_size
print(f" Total images: {len(images_data)}")
print(f" Total data size: {total_size} bytes")
with open(output_c_file, 'w', encoding='utf-8') as f:
# 文件头
f.write("/* Auto-generated by MP4 to JPEG converter */\n")
f.write(f"/* Images: {len(images_data)}, Total size: {total_size} bytes */\n\n")
# 图片数量宏定义
f.write(f"#define {array_name.upper()}_COUNT {len(images_data)}\n\n")
# 原始数据数组
f.write(f"__attribute((aligned(32))) ATTR_PSRAM_DATA_SECTION uint8_t {array_name}_raw[{total_size}] = {{\n")
# 写入所有图片数据(连续存储)
all_data = bytearray()
for data in images_data:
data_size = len(data)
aligned_size = align_to(data_size, 8)
# 添加图片数据
all_data.extend(data)
# 添加对齐填充
padding_size = aligned_size - data_size
all_data.extend(b'\x00' * padding_size)
# 写入数据每行16个字节
bytes_written = 0
while bytes_written < len(all_data):
line = " "
for j in range(16):
if bytes_written < len(all_data):
byte_val = all_data[bytes_written]
line += f"0x{byte_val:02X}, "
bytes_written += 1
f.write(line + "\n")
f.write("};\n\n")
# 偏移地址数组
f.write(f"const uint32_t {array_name}_offset[{array_name.upper()}_COUNT] = {{\n")
# 每行一个偏移值
for i, offset in enumerate(offsets):
f.write(f" {offset}, /* Image {i} */\n")
f.write("};\n")
print(f"Simple C array file generated successfully!")
return True
except Exception as e:
print(f"Error generating simple C array file: {e}")
return False
def mp4_to_jpg(video_path, output_dir, width=None, height=None,
quality=95, start_time=0, duration=None, prefix='frame',
generate_c_array=False, c_array_name="image_data", c_array_file=None):
"""
Convert MP4 video to JPEG images with ultra-simplified C array generation
"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
print(f"Output directory: {output_dir}")
# Check input file
if not os.path.exists(video_path):
print(f"Error: File '{video_path}' does not exist")
return False, []
print(f"Input video: {video_path}")
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Cannot open video file")
return False, []
# Get video info
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0 or total_frames <= 0:
print("Error: Invalid video file")
cap.release()
return False, []
print(f"Video FPS: {fps:.2f}")
print(f"Total frames: {total_frames}")
# Calculate frame range
start_frame = int(start_time * fps)
if duration is not None:
end_frame = int((start_time + duration) * fps)
end_frame = min(end_frame, total_frames)
else:
end_frame = total_frames
print(f"Start frame: {start_frame}")
print(f"End frame: {end_frame}")
print(f"Frames to extract: {end_frame - start_frame}")
# Set start position
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
# Get original size
original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Original size: {original_width} x {original_height}")
# Determine output size
if width is None and height is None:
output_width, output_height = original_width, original_height
print("Keeping original size")
elif width is not None and height is not None:
output_width, output_height = width, height
print(f"Target size: {output_width} x {output_height}")
else:
# Calculate maintaining aspect ratio
if width is None:
aspect_ratio = original_width / original_height
output_height = height
output_width = int(output_height * aspect_ratio)
else:
aspect_ratio = original_height / original_width
output_width = width
output_height = int(output_width * aspect_ratio)
print(f"Resizing to: {output_width} x {output_height}")
# Validate quality
if quality < 0 or quality > 100:
print(f"Warning: Quality {quality} out of range. Using 95")
quality = 95
print(f"JPEG quality: {quality}")
if generate_c_array:
print(f"Will generate C array with name: {c_array_name}")
# Extract frames
frame_count = 0
saved_count = 0
images_data = [] # 存储图片数据(bytes)
try:
while cap.isOpened():
if start_frame + frame_count >= end_frame:
break
ret, frame = cap.read()
if not ret:
break
# Resize
if (output_width, output_height) != (original_width, original_height):
frame = cv2.resize(frame, (output_width, output_height))
# Save image to file
filename = f"{prefix}_{saved_count:06d}.jpg"
output_path = os.path.join(output_dir, filename)
# 使用imwrite保存图片
success = cv2.imwrite(output_path, frame, [cv2.IMWRITE_JPEG_QUALITY, quality])
if success:
# 读取保存的图片文件数据
try:
with open(output_path, 'rb') as img_file:
img_data = img_file.read()
images_data.append(img_data)
saved_count += 1
if saved_count % 50 == 0:
print(f" Saved {saved_count} images...")
except Exception as e:
print(f" Warning: Failed to read saved image {saved_count}: {e}")
else:
print(f" Warning: Failed to save image {saved_count}")
frame_count += 1
except KeyboardInterrupt:
print("\nInterrupted by user")
except Exception as e:
print(f"Error during extraction: {e}")
finally:
cap.release()
print(f"\nImage extraction completed!")
print(f"Total images saved: {saved_count}")
print(f"Location: {output_dir}")
# 生成C数组文件
if generate_c_array and images_data:
if c_array_file is None:
c_array_file = os.path.join(output_dir, f"{c_array_name}.c")
success = save_as_simple_c_array(images_data, c_array_file, c_array_name)
if success:
print(f"C array generation completed!")
else:
print(f"C array generation failed!")
return saved_count > 0, images_data
def main():
parser = argparse.ArgumentParser(
description='Convert MP4 to JPEG images with ultra-simplified C array generation',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic conversion (no C array)
python mp4_2_jpg.py video.mp4 -o images
# With specific size and quality
python mp4_2_jpg.py video.mp4 -o images -w 640 -ht 480 -q 90
# With time range
python mp4_2_jpg.py video.mp4 -o images -s 30 -d 10 -w 320
# Generate C array file
python mp4_2_jpg.py video.mp4 -o images -w 240 -c -n video_frames
"""
)
# Required argument
parser.add_argument('input', help='Input video file path')
# Optional arguments
parser.add_argument('-o', '--output', default='output_frames',
help='Output directory (default: output_frames)')
parser.add_argument('-w', '--width', type=int,
help='Output image width (pixels)')
parser.add_argument('-ht', '--height', type=int,
help='Output image height (pixels)')
parser.add_argument('-q', '--quality', type=int, default=95,
help='JPEG quality (0-100, default: 95)')
parser.add_argument('-s', '--start', type=float, default=0,
help='Start time in seconds (default: 0)')
parser.add_argument('-d', '--duration', type=float,
help='Duration to extract in seconds')
parser.add_argument('-p', '--prefix', default='frame',
help='Filename prefix (default: frame)')
# C array generation arguments
parser.add_argument('-c', '--c-array', action='store_true',
help='Generate C array file with image data')
parser.add_argument('-n', '--c-name', default='image_data',
help='C array name prefix (default: image_data)')
parser.add_argument('-cf', '--c-file',
help='Output C file path (default: <output_dir>/<c_name>.c)')
# If no args, show help
if len(sys.argv) == 1:
parser.print_help()
return
args = parser.parse_args()
# Call conversion function
success, images_data = mp4_to_jpg(
video_path=args.input,
output_dir=args.output,
width=args.width,
height=args.height,
quality=args.quality,
start_time=args.start,
duration=args.duration,
prefix=args.prefix,
generate_c_array=args.c_array,
c_array_name=args.c_name,
c_array_file=args.c_file
)
if not success:
print("Conversion failed!")
sys.exit(1)
if __name__ == "__main__":
main()